Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Assign clientId on connection start

  • Loading branch information...
commit 20595248b8ad2b7f5f0a4e99d30806402e7e48a2 1 parent 4f9951f
Daniel Gómez Ferro authored September 20, 2012
19  src/main/java/com/yahoo/pasc/paxos/client/ClientState.java
@@ -19,6 +19,7 @@
19 19
 import java.util.BitSet;
20 20
 
21 21
 import com.yahoo.pasc.ProcessState;
  22
+import com.yahoo.pasc.paxos.messages.Hello;
22 23
 import com.yahoo.pasc.paxos.messages.Request;
23 24
 
24 25
 public class ClientState implements ProcessState {
@@ -26,11 +27,13 @@
26 27
     int servers;
27 28
     int quorum;
28 29
     int connected;
  30
+    int disconnected;
29 31
     int inlineThreshold;
30 32
     TimestampMessage asyncMessages [];
31 33
 
32 34
     long timestamp = -1;
33 35
     Request pendingRequest;
  36
+    Hello pendingHello;
34 37
     BitSet acks = new BitSet();
35 38
 //    byte[] value;
36 39
     private int from;
@@ -77,6 +80,14 @@ public void setConnected(int connected) {
77 80
         this.connected = connected;
78 81
     }
79 82
 
  83
+    public int getDisconnected() {
  84
+        return disconnected;
  85
+    }
  86
+
  87
+    public void setDisconnected(int disconnected) {
  88
+        this.disconnected = disconnected;
  89
+    }
  90
+
80 91
     public long getTimestamp() {
81 92
         return timestamp;
82 93
     }
@@ -93,6 +104,14 @@ public void setPendingRequest(Request pendingRequest) {
93 104
         this.pendingRequest = pendingRequest;
94 105
     }
95 106
 
  107
+    public Hello getPendingHello() {
  108
+        return pendingHello;
  109
+    }
  110
+
  111
+    public void setPendingHello(Hello pendingHello) {
  112
+        this.pendingHello = pendingHello;
  113
+    }
  114
+
96 115
     public BitSet getAcks() {
97 116
         return acks;
98 117
     }
12  src/main/java/com/yahoo/pasc/paxos/client/PaxosClient.java
@@ -31,15 +31,19 @@
31 31
 
32 32
 import com.yahoo.pasc.PascRuntime;
33 33
 import com.yahoo.pasc.paxos.client.handlers.AsyncMessageHandler;
  34
+import com.yahoo.pasc.paxos.client.handlers.ByeHandler;
34 35
 import com.yahoo.pasc.paxos.client.handlers.HelloHandler;
35 36
 import com.yahoo.pasc.paxos.client.handlers.ReplyHandler;
  37
+import com.yahoo.pasc.paxos.client.handlers.ServerHelloHandler;
36 38
 import com.yahoo.pasc.paxos.client.handlers.SubmitHandler;
37 39
 import com.yahoo.pasc.paxos.client.handlers.TimeoutHandler;
38 40
 import com.yahoo.pasc.paxos.client.messages.Submit;
39 41
 import com.yahoo.pasc.paxos.client.messages.Timeout;
40 42
 import com.yahoo.pasc.paxos.messages.AsyncMessage;
  43
+import com.yahoo.pasc.paxos.messages.Bye;
41 44
 import com.yahoo.pasc.paxos.messages.Hello;
42 45
 import com.yahoo.pasc.paxos.messages.Reply;
  46
+import com.yahoo.pasc.paxos.messages.ServerHello;
43 47
 
44 48
 public class PaxosClient {
45 49
     public static void main(String[] args) throws Exception {
@@ -96,11 +100,13 @@ public static void main(String[] args) throws Exception {
96 100
 
97 101
             String [] serverHosts = host.split(",");
98 102
             
99  
-            HelloHandler hello = new HelloHandler();
  103
+            ServerHelloHandler shello = new ServerHelloHandler();
100 104
             ReplyHandler reply = new ReplyHandler();
101 105
             SubmitHandler submit = new SubmitHandler();
102 106
             TimeoutHandler tout = new TimeoutHandler();
103 107
             AsyncMessageHandler asyncm = new AsyncMessageHandler();
  108
+            ByeHandler bye = new ByeHandler();
  109
+            HelloHandler hello = new HelloHandler();
104 110
             
105 111
             Random rnd = new Random();
106 112
             
@@ -109,11 +115,13 @@ public static void main(String[] args) throws Exception {
109 115
                 ClientState clientState = new ClientState(clientId, servers, quorum, inlineThreshold, asynSize);
110 116
                 final PascRuntime<ClientState> runtime = new PascRuntime<ClientState>(protection);
111 117
                 runtime.setState(clientState);
112  
-                runtime.addHandler(Hello.class, hello);
  118
+                runtime.addHandler(ServerHello.class, shello);
113 119
                 runtime.addHandler(Reply.class, reply);
114 120
                 runtime.addHandler(Submit.class, submit);
115 121
                 runtime.addHandler(Timeout.class, tout);
116 122
                 runtime.addHandler(AsyncMessage.class, asyncm);
  123
+                runtime.addHandler(Bye.class, bye);
  124
+                runtime.addHandler(Hello.class, hello);
117 125
                 
118 126
                 final PaxosClientHandler handler = new PaxosClientHandler(runtime, new SimpleClient(requestSize), 
119 127
                         serverHosts, clientId, clients, timeout, zkConnection, executor);
57  src/main/java/com/yahoo/pasc/paxos/client/PaxosClientHandler.java
@@ -23,6 +23,7 @@
23 23
 import java.util.Arrays;
24 24
 import java.util.Collections;
25 25
 import java.util.List;
  26
+import java.util.Random;
26 27
 import java.util.Timer;
27 28
 import java.util.TimerTask;
28 29
 import java.util.concurrent.BlockingQueue;
@@ -32,7 +33,6 @@
32 33
 import org.apache.zookeeper.KeeperException;
33 34
 import org.apache.zookeeper.WatchedEvent;
34 35
 import org.apache.zookeeper.Watcher;
35  
-import org.apache.zookeeper.Watcher.Event.EventType;
36 36
 import org.apache.zookeeper.ZooKeeper;
37 37
 import org.jboss.netty.bootstrap.ClientBootstrap;
38 38
 import org.jboss.netty.channel.Channel;
@@ -62,12 +62,14 @@
62 62
 import com.yahoo.pasc.paxos.messages.Hello;
63 63
 import com.yahoo.pasc.paxos.messages.InlineRequest;
64 64
 import com.yahoo.pasc.paxos.messages.Request;
  65
+import com.yahoo.pasc.paxos.messages.ServerHello;
65 66
 import com.yahoo.pasc.paxos.messages.serialization.ManualDecoder;
66 67
 import com.yahoo.pasc.paxos.messages.serialization.ManualEncoder;
67 68
 
68 69
 public class PaxosClientHandler extends SimpleChannelUpstreamHandler implements PaxosInterface, Watcher {
69 70
 
70 71
     private static final Logger LOG = LoggerFactory.getLogger(PaxosClientHandler.class);
  72
+    private static final int MAX_CLIENTS = 4096;
71 73
 
72 74
     private int clientId;
73 75
     private int clients;
@@ -107,6 +109,7 @@ public ChannelPipeline getPipeline() throws Exception {
107 109
         this.servers = servers;
108 110
         this.serverChannels = new Channel[servers.length];
109 111
         Arrays.fill(payload, (byte) 5);
  112
+        generateClientId();
110 113
         for (int i = 0; i < servers.length; ++i) {
111 114
             tryConnect(i);
112 115
         }
@@ -171,8 +174,17 @@ public void start() throws KeeperException, InterruptedException {
171 174
 
172 175
     @Override
173 176
     public void process(WatchedEvent event) {
174  
-        if (event.getType() != EventType.NodeChildrenChanged)
  177
+        switch (event.getType()) {
  178
+        case NodeChildrenChanged:
  179
+        case None:
  180
+            break;
  181
+        default:
175 182
             return;
  183
+        }
  184
+        
  185
+        if (event.getState() != Watcher.Event.KeeperState.SyncConnected) {
  186
+            return;
  187
+        }
176 188
 
177 189
         try {
178 190
             leader = getLeadership(zk.getChildren(ELECTION_PATH, this));
@@ -230,12 +242,23 @@ synchronized void resubmitRequest(long timestamp) {
230 242
             timer.schedule(resubmit, timeout);
231 243
         }
232 244
     }
  245
+    
  246
+    private Random random = new Random();
233 247
 
234  
-    @Override
235  
-    public synchronized void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
  248
+    private void generateClientId() {
  249
+        clientId = random.nextInt(MAX_CLIENTS);
  250
+    }
  251
+    
  252
+    private void sendHello(Channel c) {
236 253
         Hello hello = new Hello(clientId);
237 254
         hello.storeReplica(hello);
238  
-        e.getChannel().write(hello);
  255
+        runtime.handleMessage(hello);
  256
+        c.write(hello);
  257
+    }
  258
+
  259
+    @Override
  260
+    public synchronized void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
  261
+        sendHello(e.getChannel());
239 262
     }
240 263
 
241 264
     private long messagesReceived = 0;
@@ -245,9 +268,10 @@ public synchronized void channelConnected(ChannelHandlerContext ctx, ChannelStat
245 268
     @Override
246 269
     public synchronized void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
247 270
         Object message = e.getMessage();
248  
-        if (message instanceof Hello) {
249  
-            Hello hello = (Hello) e.getMessage();
250  
-            serverChannels[hello.getClientId()] = e.getChannel();
  271
+        if (message instanceof ServerHello) {
  272
+            ServerHello hello = (ServerHello) e.getMessage();
  273
+            serverChannels[hello.getServerId()] = e.getChannel();
  274
+            LOG.warn("Received hello: " + message);
251 275
         }
252 276
         LOG.trace("Received {}", message);
253 277
         List<Message> messages = runtime.handleMessage((Message) message);
@@ -257,9 +281,15 @@ public synchronized void messageReceived(ChannelHandlerContext ctx, MessageEvent
257 281
             if (m instanceof Connected) {
258 282
                 lastTime = System.nanoTime();
259 283
 
260  
-//                new Thread(new ThroughputMonitor()).start();
  284
+                // new Thread(new ThroughputMonitor()).start();
261 285
 
262 286
                 clientInterface.connected();
  287
+            } else if (m instanceof Reconnect) {
  288
+                generateClientId();
  289
+                for (Channel c : serverChannels) {
  290
+                    if (c != null)
  291
+                        sendHello(c);
  292
+                }
263 293
             } else if (m instanceof Received) {
264 294
                 if (resubmit != null)
265 295
                     resubmit.cancel();
@@ -434,22 +464,23 @@ public void run() {
434 464
                 }
435 465
                 System.out.println(String.format("Throughput: %8.8f m/s", (messagesReceived - startMessagges)
436 466
                         / ((double) (System.currentTimeMillis() - startTime) / 1000)));
437  
-                System.out.println(String.format("Latency: %4.4f ms", (totalTime / (double) latencyReceived) / 1000000));
  467
+                System.out
  468
+                        .println(String.format("Latency: %4.4f ms", (totalTime / (double) latencyReceived) / 1000000));
438 469
 
439 470
                 barrier.close();
440  
-    
  471
+
441 472
                 for (Channel c : serverChannels) {
442 473
                     if (c != null) {
443 474
                         c.close();
444 475
                     }
445 476
                 }
446  
-    
  477
+
447 478
                 try {
448 479
                     Thread.sleep(2000);
449 480
                 } catch (InterruptedException e) {
450 481
                     // ignore
451 482
                 }
452  
-    
  483
+
453 484
                 System.exit(0);
454 485
             } catch (KeeperException e) {
455 486
                 LOG.error("Couldn't measure throughput", e);
33  src/main/java/com/yahoo/pasc/paxos/client/Reconnect.java
... ...
@@ -0,0 +1,33 @@
  1
+/**
  2
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *     http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License. See accompanying LICENSE file.
  15
+ */
  16
+
  17
+package com.yahoo.pasc.paxos.client;
  18
+
  19
+import com.yahoo.pasc.Message;
  20
+
  21
+public class Reconnect extends Message {
  22
+
  23
+    @Override
  24
+    protected boolean verify() {
  25
+        return true;
  26
+    }
  27
+
  28
+    @Override
  29
+    public void storeReplica(Message m) {
  30
+
  31
+    }
  32
+
  33
+}
67  src/main/java/com/yahoo/pasc/paxos/client/handlers/ByeHandler.java
... ...
@@ -0,0 +1,67 @@
  1
+/**
  2
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *     http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License. See accompanying LICENSE file.
  15
+ */
  16
+
  17
+package com.yahoo.pasc.paxos.client.handlers;
  18
+
  19
+import java.util.Arrays;
  20
+import java.util.List;
  21
+
  22
+import com.yahoo.pasc.Message;
  23
+import com.yahoo.pasc.MessageHandler;
  24
+import com.yahoo.pasc.paxos.client.ClientState;
  25
+import com.yahoo.pasc.paxos.client.Reconnect;
  26
+import com.yahoo.pasc.paxos.messages.Bye;
  27
+import com.yahoo.pasc.paxos.messages.Hello;
  28
+
  29
+public class ByeHandler implements MessageHandler<Bye, ClientState, Reconnect> {
  30
+
  31
+    @Override
  32
+    public boolean guardPredicate(Bye receivedMessage) {
  33
+        return true;
  34
+    }
  35
+
  36
+    @Override
  37
+    public List<Reconnect> processMessage(Bye bye, ClientState state) {
  38
+        List<Reconnect> descriptors = null;
  39
+        if (!matches(bye, state)) {
  40
+            return null;
  41
+        }
  42
+        int disconnected = state.getDisconnected();
  43
+        disconnected++;
  44
+        state.setDisconnected(disconnected);
  45
+        int maxFaults = state.getServers() / 2 - 1;
  46
+        if (disconnected == maxFaults) {
  47
+            // Send the first message if connected to all servers
  48
+            descriptors = Arrays.asList(new Reconnect());
  49
+        }
  50
+        return descriptors;
  51
+    }
  52
+
  53
+    private boolean matches(Bye bye, ClientState state) {
  54
+        Hello hello = state.getPendingHello();
  55
+        if (hello == null)
  56
+            return false;
  57
+        return hello.getClientId() == bye.getClientId();
  58
+    }
  59
+
  60
+    @Override
  61
+    public List<Message> getOutputMessages(ClientState state, List<Reconnect> descriptors) {
  62
+        if (descriptors != null && descriptors.size() > 0) {
  63
+            return Arrays.<Message> asList(new Reconnect());
  64
+        }
  65
+        return null;
  66
+    }
  67
+}
25  src/main/java/com/yahoo/pasc/paxos/client/handlers/HelloHandler.java
@@ -22,10 +22,9 @@
22 22
 import com.yahoo.pasc.Message;
23 23
 import com.yahoo.pasc.MessageHandler;
24 24
 import com.yahoo.pasc.paxos.client.ClientState;
25  
-import com.yahoo.pasc.paxos.client.Connected;
26 25
 import com.yahoo.pasc.paxos.messages.Hello;
27 26
 
28  
-public class HelloHandler implements MessageHandler<Hello, ClientState, Connected> {
  27
+public class HelloHandler implements MessageHandler<Hello, ClientState, Hello> {
29 28
 
30 29
     @Override
31 30
     public boolean guardPredicate(Hello receivedMessage) {
@@ -33,23 +32,15 @@ public boolean guardPredicate(Hello receivedMessage) {
33 32
     }
34 33
 
35 34
     @Override
36  
-    public List<Connected> processMessage(Hello hello, ClientState state) {
37  
-        List<Connected> descriptors = null;
38  
-        int connected = state.getConnected();
39  
-        connected++;
40  
-        state.setConnected(connected);
41  
-        if (connected == state.getServers()) {
42  
-            // Send the first message if connected to all servers
43  
-            descriptors = Arrays.asList(new Connected());
44  
-        }
45  
-        return descriptors;
  35
+    public List<Hello> processMessage(Hello hello, ClientState state) {
  36
+        state.setPendingHello(hello);
  37
+        state.setConnected(0);
  38
+        state.setDisconnected(0);
  39
+        return Arrays.asList(hello);
46 40
     }
47 41
 
48 42
     @Override
49  
-    public List<Message> getOutputMessages(ClientState state, List<Connected> descriptors) {
50  
-        if (descriptors != null && descriptors.size() > 0) {
51  
-            return Arrays.<Message> asList(new Connected());
52  
-        }
53  
-        return null;
  43
+    public List<Message> getOutputMessages(ClientState state, List<Hello> messages) {
  44
+        return Arrays.<Message>asList(messages.get(0));
54 45
     }
55 46
 }
73  src/main/java/com/yahoo/pasc/paxos/client/handlers/ServerHelloHandler.java
... ...
@@ -0,0 +1,73 @@
  1
+/**
  2
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *     http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License. See accompanying LICENSE file.
  15
+ */
  16
+
  17
+package com.yahoo.pasc.paxos.client.handlers;
  18
+
  19
+import java.util.Arrays;
  20
+import java.util.List;
  21
+
  22
+import org.slf4j.Logger;
  23
+import org.slf4j.LoggerFactory;
  24
+
  25
+import com.yahoo.pasc.Message;
  26
+import com.yahoo.pasc.MessageHandler;
  27
+import com.yahoo.pasc.paxos.client.ClientState;
  28
+import com.yahoo.pasc.paxos.client.Connected;
  29
+import com.yahoo.pasc.paxos.client.PaxosClientHandler;
  30
+import com.yahoo.pasc.paxos.messages.Hello;
  31
+import com.yahoo.pasc.paxos.messages.ServerHello;
  32
+
  33
+public class ServerHelloHandler implements MessageHandler<ServerHello, ClientState, Connected> {
  34
+    private static final Logger LOG = LoggerFactory.getLogger(ServerHelloHandler.class);
  35
+
  36
+    @Override
  37
+    public boolean guardPredicate(ServerHello receivedMessage) {
  38
+        return true;
  39
+    }
  40
+
  41
+    @Override
  42
+    public List<Connected> processMessage(ServerHello hello, ClientState state) {
  43
+        List<Connected> descriptors = null;
  44
+        if (!matches(hello, state)) {
  45
+            return null;
  46
+        }
  47
+        int connected = state.getConnected();
  48
+        connected++;
  49
+        state.setConnected(connected);
  50
+        if (connected == state.getQuorum()  ) {
  51
+            LOG.warn("Connected");
  52
+            state.setClientId(hello.getClientId());
  53
+            // Send the first message if connected to all servers
  54
+            descriptors = Arrays.asList(new Connected());
  55
+        }
  56
+        return descriptors;
  57
+    }
  58
+
  59
+    private boolean matches(ServerHello serverHello, ClientState state) {
  60
+        Hello hello = state.getPendingHello();
  61
+        if (hello == null)
  62
+            return false;
  63
+        return hello.getClientId() == serverHello.getClientId();
  64
+    }
  65
+
  66
+    @Override
  67
+    public List<Message> getOutputMessages(ClientState state, List<Connected> descriptors) {
  68
+        if (descriptors != null && descriptors.size() > 0) {
  69
+            return Arrays.<Message> asList(new Connected());
  70
+        }
  71
+        return null;
  72
+    }
  73
+}
71  src/main/java/com/yahoo/pasc/paxos/messages/Bye.java
... ...
@@ -0,0 +1,71 @@
  1
+/**
  2
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *     http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License. See accompanying LICENSE file.
  15
+ */
  16
+
  17
+package com.yahoo.pasc.paxos.messages;
  18
+
  19
+import java.io.Serializable;
  20
+
  21
+import com.yahoo.pasc.CloneableDeep;
  22
+import com.yahoo.pasc.EqualsDeep;
  23
+
  24
+public class Bye extends PaxosMessage implements Serializable, CloneableDeep<Bye>, EqualsDeep<Bye> {
  25
+
  26
+    private static final long serialVersionUID = -3957750297988762927L;
  27
+
  28
+    int clientId;
  29
+    int serverId;
  30
+
  31
+    public Bye(int clientId, int serverId) {
  32
+        super();
  33
+        this.clientId = clientId;
  34
+        this.serverId = serverId;
  35
+    }
  36
+
  37
+    public Bye() {
  38
+
  39
+    }
  40
+
  41
+    public int getClientId() {
  42
+        return clientId;
  43
+    }
  44
+
  45
+    public void setClientId(int clientId) {
  46
+        this.clientId = clientId;
  47
+    }
  48
+
  49
+    public int getServerId() {
  50
+        return serverId;
  51
+    }
  52
+
  53
+    public void setServerId(int serverId) {
  54
+        this.serverId = serverId;
  55
+    }
  56
+
  57
+    @Override
  58
+    public Bye cloneDeep() {
  59
+        return new Bye(clientId, serverId);
  60
+    }
  61
+
  62
+    @Override
  63
+    public boolean equalsDeep(Bye other) {
  64
+        return clientId == other.clientId && serverId == other.serverId;
  65
+    }
  66
+
  67
+    @Override
  68
+    public String toString() {
  69
+        return String.format("{Bye sent from %d to %d %s}", serverId, clientId, super.toString());
  70
+    }
  71
+}
6  src/main/java/com/yahoo/pasc/paxos/messages/MessageType.java
@@ -19,7 +19,7 @@
19 19
 public enum MessageType {
20 20
 
21 21
     REQUEST, INLINEREQ, REPLY, ACCEPT, ACCEPTED, DIGEST, EXECUTE, PREREPLY, HELLO, PREPARE, PREPARED, LEAD_CHANGE,
22  
-    ASYNC_MESSAGE, CONTROL;
  22
+    ASYNC_MESSAGE, CONTROL, SERVERHELLO, BYE;
23 23
     
24 24
     public static MessageType getMessageType(PaxosMessage m) {
25 25
         if (m instanceof InlineRequest)
@@ -50,6 +50,10 @@ public static MessageType getMessageType(PaxosMessage m) {
50 50
             return ASYNC_MESSAGE;
51 51
         if (m instanceof ControlMessage)
52 52
             return CONTROL;
  53
+        if (m instanceof ServerHello)
  54
+            return SERVERHELLO;
  55
+        if (m instanceof Bye)
  56
+            return BYE;
53 57
         throw new IllegalArgumentException("Unknown message type: " + m);
54 58
     }
55 59
 }
71  src/main/java/com/yahoo/pasc/paxos/messages/ServerHello.java
... ...
@@ -0,0 +1,71 @@
  1
+/**
  2
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *     http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License. See accompanying LICENSE file.
  15
+ */
  16
+
  17
+package com.yahoo.pasc.paxos.messages;
  18
+
  19
+import java.io.Serializable;
  20
+
  21
+import com.yahoo.pasc.CloneableDeep;
  22
+import com.yahoo.pasc.EqualsDeep;
  23
+
  24
+public class ServerHello extends PaxosMessage implements Serializable, CloneableDeep<ServerHello>, EqualsDeep<ServerHello> {
  25
+
  26
+    private static final long serialVersionUID = -3957750297988762927L;
  27
+
  28
+    int clientId;
  29
+    int serverId;
  30
+
  31
+    public ServerHello(int clientId, int serverId) {
  32
+        super();
  33
+        this.clientId = clientId;
  34
+        this.serverId = serverId;
  35
+    }
  36
+
  37
+    public ServerHello() {
  38
+
  39
+    }
  40
+
  41
+    public int getClientId() {
  42
+        return clientId;
  43
+    }
  44
+
  45
+    public void setClientId(int clientId) {
  46
+        this.clientId = clientId;
  47
+    }
  48
+
  49
+    public int getServerId() {
  50
+        return serverId;
  51
+    }
  52
+
  53
+    public void setServerId(int serverId) {
  54
+        this.serverId = serverId;
  55
+    }
  56
+
  57
+    @Override
  58
+    public ServerHello cloneDeep() {
  59
+        return new ServerHello(clientId, serverId);
  60
+    }
  61
+
  62
+    @Override
  63
+    public boolean equalsDeep(ServerHello other) {
  64
+        return clientId == other.clientId && serverId == other.serverId;
  65
+    }
  66
+    
  67
+    @Override
  68
+    public String toString() {
  69
+        return String.format("{ServerHello sent from %d to %d %s}", serverId, clientId, super.toString());
  70
+    }
  71
+}
18  src/main/java/com/yahoo/pasc/paxos/messages/serialization/ManualDecoder.java
@@ -30,6 +30,7 @@
30 30
 import com.yahoo.pasc.paxos.messages.Accept;
31 31
 import com.yahoo.pasc.paxos.messages.Accepted;
32 32
 import com.yahoo.pasc.paxos.messages.AsyncMessage;
  33
+import com.yahoo.pasc.paxos.messages.Bye;
33 34
 import com.yahoo.pasc.paxos.messages.ControlMessage;
34 35
 import com.yahoo.pasc.paxos.messages.Digest;
35 36
 import com.yahoo.pasc.paxos.messages.Hello;
@@ -40,6 +41,7 @@
40 41
 import com.yahoo.pasc.paxos.messages.Prepared;
41 42
 import com.yahoo.pasc.paxos.messages.Reply;
42 43
 import com.yahoo.pasc.paxos.messages.Request;
  44
+import com.yahoo.pasc.paxos.messages.ServerHello;
43 45
 import com.yahoo.pasc.paxos.state.ClientTimestamp;
44 46
 import com.yahoo.pasc.paxos.state.DigestidDigest;
45 47
 import com.yahoo.pasc.paxos.state.InstanceRecord;
@@ -69,7 +71,7 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
69 71
 
70 72
         int length = buf.readInt();
71 73
         length -= 4; // length has already been read
72  
-
  74
+        
73 75
         if (buf.readableBytes() < length) {
74 76
             buf.resetReaderIndex();
75 77
             return null;
@@ -78,6 +80,10 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
78 80
         long crc = buf.readLong();
79 81
         length -= 8; // crc has been read
80 82
 
  83
+        if (length == 0) {
  84
+            LOG.warn("Length is 0.");
  85
+        }
  86
+
81 87
         byte[] bytearray = new byte[length];
82 88
         buf.markReaderIndex();
83 89
         buf.readBytes(bytearray, 0, length);
@@ -182,6 +188,16 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
182 188
             h.setCRC(crc);
183 189
             return h;
184 190
         }
  191
+        case SERVERHELLO: {
  192
+            ServerHello h = new ServerHello(buf.readInt(), buf.readInt());
  193
+            h.setCRC(crc);
  194
+            return h;
  195
+        }
  196
+        case BYE: {
  197
+            Bye h = new Bye(buf.readInt(), buf.readInt());
  198
+            h.setCRC(crc);
  199
+            return h;
  200
+        }
185 201
         case PREPARE: {
186 202
             int senderId = buf.readInt();
187 203
             int ballot = buf.readInt();
36  src/main/java/com/yahoo/pasc/paxos/messages/serialization/ManualEncoder.java
@@ -31,6 +31,7 @@
31 31
 import com.yahoo.pasc.paxos.messages.Accept;
32 32
 import com.yahoo.pasc.paxos.messages.Accepted;
33 33
 import com.yahoo.pasc.paxos.messages.AsyncMessage;
  34
+import com.yahoo.pasc.paxos.messages.Bye;
34 35
 import com.yahoo.pasc.paxos.messages.ControlMessage;
35 36
 import com.yahoo.pasc.paxos.messages.Digest;
36 37
 import com.yahoo.pasc.paxos.messages.Hello;
@@ -40,6 +41,7 @@
40 41
 import com.yahoo.pasc.paxos.messages.Prepared;
41 42
 import com.yahoo.pasc.paxos.messages.Reply;
42 43
 import com.yahoo.pasc.paxos.messages.Request;
  44
+import com.yahoo.pasc.paxos.messages.ServerHello;
43 45
 import com.yahoo.pasc.paxos.state.ClientTimestamp;
44 46
 import com.yahoo.pasc.paxos.state.InstanceRecord;
45 47
 
@@ -115,6 +117,12 @@ public static int getSize(PaxosMessage msg, boolean encodeCRC) {
115 117
         case HELLO:
116 118
             result += 4;
117 119
             break;
  120
+        case SERVERHELLO:
  121
+            result += 8;
  122
+            break;
  123
+        case BYE:
  124
+            result += 8;
  125
+            break;
118 126
         case PREPARE:
119 127
             result += 16;
120 128
             break;
@@ -127,6 +135,8 @@ public static int getSize(PaxosMessage msg, boolean encodeCRC) {
127 135
         case CONTROL:
128 136
             result += 8 + ((ControlMessage) msg).getControlMessage().length;
129 137
             break;
  138
+        default:
  139
+            throw new RuntimeException("unknown size for message " + msg + " with type " + type);
130 140
         }
131 141
         return result;
132 142
     }
@@ -145,7 +155,7 @@ public ChannelBuffer encode(PaxosMessage msg, boolean encodeCRC, int size, Chann
145 155
 
146 156
         switch (type) {
147 157
         case REQUEST:
148  
-        case INLINEREQ:{
  158
+        case INLINEREQ: {
149 159
             Request s = (Request) msg;
150 160
             buffer.writeInt(s.getClientId());
151 161
             buffer.writeLong(s.getTimestamp());
@@ -174,7 +184,7 @@ public ChannelBuffer encode(PaxosMessage msg, boolean encodeCRC, int size, Chann
174 184
                 buffer.writeInt(-1);
175 185
             } else {
176 186
                 buffer.writeInt(requests.length);
177  
-                for (byte [] request : requests) {
  187
+                for (byte[] request : requests) {
178 188
                     if (request == null) {
179 189
                         buffer.writeInt(-1);
180 190
                     } else {
@@ -214,6 +224,18 @@ public ChannelBuffer encode(PaxosMessage msg, boolean encodeCRC, int size, Chann
214 224
             buffer.writeInt(h.getClientId());
215 225
             break;
216 226
         }
  227
+        case SERVERHELLO: {
  228
+            ServerHello h = (ServerHello) msg;
  229
+            buffer.writeInt(h.getClientId());
  230
+            buffer.writeInt(h.getServerId());
  231
+            break;
  232
+        }
  233
+        case BYE: {
  234
+            Bye b = (Bye) msg;
  235
+            buffer.writeInt(b.getClientId());
  236
+            buffer.writeInt(b.getServerId());
  237
+            break;
  238
+        }
217 239
         case PREPARE: {
218 240
             Prepare p = (Prepare) msg;
219 241
             buffer.writeInt(p.getSenderId());
@@ -228,22 +250,22 @@ public ChannelBuffer encode(PaxosMessage msg, boolean encodeCRC, int size, Chann
228 250
             buffer.writeInt(pd.getReplyBallot());
229 251
 
230 252
             buffer.writeInt(pd.getAcceptedSize());
231  
-            InstanceRecord [] accepted = pd.getAcceptedReqs();
232  
-            for (int i = 0; i < pd.getAcceptedSize(); i++){
  253
+            InstanceRecord[] accepted = pd.getAcceptedReqs();
  254
+            for (int i = 0; i < pd.getAcceptedSize(); i++) {
233 255
                 InstanceRecord currAccepted = accepted[i];
234 256
                 buffer.writeLong(currAccepted.getIid());
235 257
                 buffer.writeInt(currAccepted.getBallot());
236 258
                 buffer.writeInt(currAccepted.getArraySize());
237 259
                 ClientTimestamp[] ct = currAccepted.getClientTimestamps();
238  
-                for (int j = 0; j < currAccepted.getArraySize(); j++){
  260
+                for (int j = 0; j < currAccepted.getArraySize(); j++) {
239 261
                     buffer.writeInt(ct[j].getClientId());
240 262
                     buffer.writeLong(ct[j].getTimestamp());
241 263
                 }
242 264
             }
243 265
 
244 266
             buffer.writeInt(pd.getLearnedSize());
245  
-            Accept [] learned = pd.getLearnedReqs();
246  
-            for (int i = 0; i < pd.getLearnedSize(); i++){
  267
+            Accept[] learned = pd.getLearnedReqs();
  268
+            for (int i = 0; i < pd.getLearnedSize(); i++) {
247 269
                 Accept currLearned = learned[i];
248 270
                 encode(currLearned, true, ManualEncoder.getSize(currLearned, true), buffer);
249 271
             }
2  src/main/java/com/yahoo/pasc/paxos/server/ServerConnection.java
@@ -29,6 +29,6 @@
29 29
     public void addClient(int clientId, Channel channel);
30 30
 
31 31
     public void run() throws IOException;
32  
-    
  32
+
33 33
     public void close() throws IOException;
34 34
 }
5  src/main/java/com/yahoo/pasc/paxos/server/ServerHandler.java
@@ -35,14 +35,16 @@
35 35
 import com.yahoo.pasc.Message;
36 36
 import com.yahoo.pasc.PascRuntime;
37 37
 import com.yahoo.pasc.paxos.messages.Accept;
  38
+import com.yahoo.pasc.paxos.messages.Bye;
38 39
 import com.yahoo.pasc.paxos.messages.ControlMessage;
39 40
 import com.yahoo.pasc.paxos.messages.DigestToSM;
40 41
 import com.yahoo.pasc.paxos.messages.Execute;
  42
+import com.yahoo.pasc.paxos.messages.Hello;
41 43
 import com.yahoo.pasc.paxos.messages.LeadershipChange;
42 44
 import com.yahoo.pasc.paxos.messages.MessageType;
43 45
 import com.yahoo.pasc.paxos.messages.PaxosMessage;
44 46
 import com.yahoo.pasc.paxos.messages.PreReply;
45  
-import com.yahoo.pasc.paxos.messages.Hello;
  47
+import com.yahoo.pasc.paxos.messages.ServerHello;
46 48
 import com.yahoo.pasc.paxos.state.PaxosState;
47 49
 import com.yahoo.pasc.paxos.statemachine.ControlHandler;
48 50
 import com.yahoo.pasc.paxos.statemachine.Response;
@@ -108,7 +110,6 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
108 110
             if (message instanceof Hello) {
109 111
                 Hello r = (Hello) e.getMessage();
110 112
                 serverConnection.addClient(r.getClientId(), e.getChannel());
111  
-                serverConnection.forward(Arrays.<Message> asList(r));
112 113
                 return;
113 114
             } else if (message instanceof Accept) {
114 115
                 accepts++;
64  src/main/java/com/yahoo/pasc/paxos/server/tcp/TcpServer.java
@@ -20,6 +20,7 @@
20 20
 import java.net.InetAddress;
21 21
 import java.net.InetSocketAddress;
22 22
 import java.net.UnknownHostException;
  23
+import java.util.Arrays;
23 24
 import java.util.List;
24 25
 import java.util.concurrent.ConcurrentHashMap;
25 26
 import java.util.concurrent.ConcurrentMap;
@@ -51,9 +52,11 @@
51 52
 import com.yahoo.pasc.PascRuntime;
52 53
 import com.yahoo.pasc.paxos.Barrier;
53 54
 import com.yahoo.pasc.paxos.messages.AsyncMessage;
  55
+import com.yahoo.pasc.paxos.messages.Bye;
54 56
 import com.yahoo.pasc.paxos.messages.Hello;
55 57
 import com.yahoo.pasc.paxos.messages.Prepared;
56 58
 import com.yahoo.pasc.paxos.messages.Reply;
  59
+import com.yahoo.pasc.paxos.messages.ServerHello;
57 60
 import com.yahoo.pasc.paxos.messages.serialization.ManualEncoder;
58 61
 import com.yahoo.pasc.paxos.server.LeaderElection;
59 62
 import com.yahoo.pasc.paxos.server.PipelineFactory;
@@ -64,7 +67,7 @@
64 67
 import com.yahoo.pasc.paxos.statemachine.StateMachine;
65 68
 
66 69
 public class TcpServer implements ServerConnection {
67  
-    
  70
+
68 71
     private static final Logger LOG = LoggerFactory.getLogger(TcpServer.class);
69 72
 
70 73
     private String servers[];
@@ -74,13 +77,14 @@
74 77
     private ExecutorService workerExecutor;
75 78
 
76 79
     private ChannelGroup serverChannels = new DefaultChannelGroup("servers");
77  
-    private ConcurrentMap<Integer, Channel> indexedServerChannels = new ConcurrentHashMap<Integer, Channel>(1024, 0.75f, 32);
  80
+    private ConcurrentMap<Integer, Channel> indexedServerChannels = new ConcurrentHashMap<Integer, Channel>(1024,
  81
+            0.75f, 32);
78 82
     private ConcurrentMap<Integer, Channel> clientChannels = new ConcurrentHashMap<Integer, Channel>(1024, 0.75f, 32);
79 83
 
80 84
     private int port;
81 85
     private int threads;
82 86
     private int id;
83  
-    
  87
+
84 88
     private EncoderEmbedder<ChannelBuffer> embedder = new EncoderEmbedder<ChannelBuffer>(new ManualEncoder());
85 89
 
86 90
     private Channel serverChannel;
@@ -93,20 +97,18 @@
93 97
 
94 98
     private Barrier barrier;
95 99
 
96  
-    public TcpServer(PascRuntime<PaxosState> runtime, StateMachine sm, ControlHandler controlHandler, String zk, 
97  
-            String servers[], int port, int threads, final int id, boolean twoStages) 
98  
-                    throws IOException, KeeperException {
  100
+    public TcpServer(PascRuntime<PaxosState> runtime, StateMachine sm, ControlHandler controlHandler, String zk,
  101
+            String servers[], int port, int threads, final int id, boolean twoStages) throws IOException,
  102
+            KeeperException {
99 103
         this.bossExecutor = Executors.newCachedThreadPool();
100 104
         this.workerExecutor = Executors.newCachedThreadPool();
101 105
         this.executionHandler = new ExecutionHandler(new MemoryAwareThreadPoolExecutor(1, 1024 * 1024,
102  
-                1024 * 1024 * 1024, 10, TimeUnit.SECONDS,
103  
-                new ObjectSizeEstimator() {
  106
+                1024 * 1024 * 1024, 10, TimeUnit.SECONDS, new ObjectSizeEstimator() {
104 107
                     @Override
105 108
                     public int estimateSize(Object o) {
106 109
                         return 1024;
107 110
                     }
108  
-                },
109  
-                new ThreadFactory() {
  111
+                }, new ThreadFactory() {
110 112
                     private int count = 0;
111 113
 
112 114
                     @Override
@@ -117,7 +119,8 @@ public Thread newThread(Runnable r) {
117 119
         this.channelHandler = new ServerHandler(runtime, sm, controlHandler, this);
118 120
         this.channelPipelineFactory = new PipelineFactory(channelHandler, executionHandler, twoStages);
119 121
         this.leaderElection = new LeaderElection(zk, id, this.channelHandler);
120  
-        this.barrier = new Barrier(new ZooKeeper(zk, 2000, leaderElection), "/paxos_srv_barrier", "" + id, servers.length);
  122
+        this.barrier = new Barrier(new ZooKeeper(zk, 2000, leaderElection), "/paxos_srv_barrier", "" + id,
  123
+                servers.length);
121 124
         this.servers = servers;
122 125
         this.port = port;
123 126
         this.threads = threads;
@@ -149,7 +152,7 @@ public void forward(List<Message> messages) {
149 152
             if (msg == null) {
150 153
                 continue;
151 154
             }
152  
-            
  155
+
153 156
             if (msg instanceof Reply) {
154 157
                 Reply reply = (Reply) msg;
155 158
                 int clientId = reply.getClientId();
@@ -172,11 +175,9 @@ public void forward(List<Message> messages) {
172 175
                 embedder.offer(asyncMessage);
173 176
                 ChannelBuffer encoded = embedder.poll();
174 177
                 clientChannel.write(encoded);
175  
-            } else if (msg instanceof Hello) {
176  
-                Hello hello = (Hello) msg;
  178
+            } else if (msg instanceof ServerHello) {
  179
+                ServerHello hello = (ServerHello) msg;
177 180
                 int clientId = hello.getClientId();
178  
-                hello.setClientId(id);
179  
-                hello.storeReplica(hello);
180 181
                 Channel clientChannel = clientChannels.get(clientId);
181 182
                 if (clientChannel == null) {
182 183
                     LOG.error("Client {} not yet connected. Cannot send reply.", clientId);
@@ -185,6 +186,17 @@ public void forward(List<Message> messages) {
185 186
                 embedder.offer(hello);
186 187
                 ChannelBuffer encoded = embedder.poll();
187 188
                 clientChannel.write(encoded);
  189
+            } else if (msg instanceof Bye) {
  190
+                Bye bye = (Bye) msg;
  191
+                int clientId = bye.getClientId();
  192
+                Channel clientChannel = clientChannels.get(clientId);
  193
+                if (clientChannel == null) {
  194
+                    LOG.error("Client {} not yet connected. Cannot send reply.", clientId);
  195
+                    continue;
  196
+                }
  197
+                embedder.offer(bye);
  198
+                ChannelBuffer encoded = embedder.poll();
  199
+                clientChannel.write(encoded);
188 200
             } else if (msg instanceof Prepared) {
189 201
                 Prepared prepared = (Prepared) msg;
190 202
                 int receiver = prepared.getReceiver();
@@ -201,20 +213,28 @@ public void forward(List<Message> messages) {
201 213
                 ChannelBuffer encoded = embedder.poll();
202 214
                 serverChannels.write(encoded);
203 215
             }
204  
-            
  216
+
205 217
         }
206 218
     }
207  
-    
  219
+
208 220
     @Override
209 221
     public void addClient(int clientId, Channel channel) {
210  
-        if (!clientChannels.containsKey(clientId)) {
  222
+        if (clientChannels.containsKey(clientId)) {
  223
+            Bye bye = new Bye(clientId, id);
  224
+            bye.storeReplica(bye);
  225
+            forward(Arrays.<Message> asList(bye));
  226
+        } else {
211 227
             LOG.debug("Adding client " + clientId + " " + channel);
212 228
             clientChannels.put(clientId, channel);
  229
+            ServerHello sh = new ServerHello(clientId, id);
  230
+            sh.storeReplica(sh);
  231
+            forward(Arrays.<Message> asList(sh));
213 232
         }
214 233
     }
215 234
 
216 235
     private void startServer() {
217  
-        ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(bossExecutor, workerExecutor, threads));
  236
+        ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(bossExecutor, workerExecutor,
  237
+                threads));
218 238
 
219 239
         bootstrap.setPipelineFactory(channelPipelineFactory);
220 240
 
@@ -225,7 +245,7 @@ private void startServer() {
225 245
         try {
226 246
             LOG.warn("Bound :" + serverChannel + " at " + InetAddress.getLocalHost().getHostName());
227 247
         } catch (UnknownHostException e) {
228  
-            //ignore
  248
+            // ignore
229 249
         }
230 250
     }
231 251
 
@@ -267,7 +287,7 @@ private void setupConnections() {
267 287
         }
268 288
 
269 289
     }
270  
-    
  290
+
271 291
     @Override
272 292
     public void close() throws IOException {