Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added control messages (e.g. HeartBeats)

  • Loading branch information...
commit e961fcfb12f238c965bd0c8f418a67999b7ccbb9 1 parent ab193a0
@dgomezferro authored
View
12 src/main/java/com/yahoo/pasc/paxos/client/PaxosClientHandler.java
@@ -58,6 +58,7 @@
import com.yahoo.pasc.paxos.client.messages.Received;
import com.yahoo.pasc.paxos.client.messages.Submit;
import com.yahoo.pasc.paxos.client.messages.Timeout;
+import com.yahoo.pasc.paxos.messages.ControlMessage;
import com.yahoo.pasc.paxos.messages.Hello;
import com.yahoo.pasc.paxos.messages.InlineRequest;
import com.yahoo.pasc.paxos.messages.Request;
@@ -155,6 +156,13 @@ public void submitNewRequest(byte[] request) {
}
}
+ @Override
+ public void submitControlMessage(byte[] controlMessage) {
+ ControlMessage cm = new ControlMessage(clientId, controlMessage);
+ cm.storeReplica(cm);
+ send(cm);
+ }
+
private static final String ELECTION_PATH = "/pasc_election";
public void start() throws KeeperException, InterruptedException {
@@ -250,10 +258,12 @@ public synchronized void messageReceived(ChannelHandlerContext ctx, MessageEvent
if (m instanceof Connected) {
lastTime = System.nanoTime();
- new Thread(new ThroughputMonitor()).start();
+// new Thread(new ThroughputMonitor()).start();
clientInterface.connected();
} else if (m instanceof Received) {
+ if (resubmit != null)
+ resubmit.cancel();
++messagesReceived;
if (messagesReceived % period == 0) {
long currentTime = System.nanoTime();
View
3  src/main/java/com/yahoo/pasc/paxos/client/PaxosInterface.java
@@ -17,6 +17,7 @@
package com.yahoo.pasc.paxos.client;
public interface PaxosInterface {
+ public void submitNewRequest(byte[] request);
- public void submitNewRequest(byte [] request);
+ public void submitControlMessage(byte[] controlMessage);
}
View
7 src/main/java/com/yahoo/pasc/paxos/messages/AsyncMessage.java
@@ -67,4 +67,11 @@ public boolean equalsDeep(AsyncMessage other) {
}
return (this.clientId == other.clientId && this.serverId == other.serverId && this.timestamp == other.timestamp);
}
+
+ @Override
+ public String toString() {
+ return "AsyncMessage [clientId=" + clientId + ", serverId=" + serverId + ", timestamp=" + timestamp
+ + ", message=" + Arrays.toString(message) + "]";
+ }
+
}
View
78 src/main/java/com/yahoo/pasc/paxos/messages/ControlMessage.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.pasc.paxos.messages;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import com.yahoo.pasc.CloneableDeep;
+import com.yahoo.pasc.EqualsDeep;
+
+public class ControlMessage extends PaxosMessage implements Serializable, EqualsDeep<ControlMessage>, CloneableDeep<ControlMessage> {
+
+ private static final long serialVersionUID = 1111659280353033430L;
+
+ int clientId;
+ byte[] controlMessage;
+
+ public ControlMessage() {
+ }
+
+ public ControlMessage(int clientId, byte[] controlMessage) {
+ super();
+ this.clientId = clientId;
+ this.controlMessage = controlMessage;
+ }
+
+ public int getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(int clientId) {
+ this.clientId = clientId;
+ }
+
+ public byte[] getControlMessage() {
+ return controlMessage;
+ }
+
+ public void setControlMessage(byte[] controlMessage) {
+ this.controlMessage = controlMessage;
+ }
+
+ @Override
+ public String toString() {
+ String requestStr = Arrays.toString(controlMessage);
+ if (requestStr.length() > 20) {
+ requestStr = requestStr.substring(0, 20) + " ... ]";
+ }
+ return String.format("{ControlMessage %s sent from %d %s}", requestStr, clientId, super.toString());
+ }
+
+ public ControlMessage cloneDeep (){
+ byte [] resArray = new byte[this.controlMessage.length];
+ System.arraycopy(controlMessage, 0, resArray, 0, controlMessage.length);
+ return new ControlMessage(this.clientId, resArray);
+ }
+
+ public boolean equalsDeep(ControlMessage other){
+ if (!Arrays.equals(other.controlMessage, this.controlMessage)) {
+ return false;
+ }
+ return (this.clientId == other.clientId);
+ }
+}
View
4 src/main/java/com/yahoo/pasc/paxos/messages/MessageType.java
@@ -19,7 +19,7 @@
public enum MessageType {
REQUEST, INLINEREQ, REPLY, ACCEPT, ACCEPTED, DIGEST, EXECUTE, PREREPLY, HELLO, PREPARE, PREPARED, LEAD_CHANGE,
- ASYNC_MESSAGE;
+ ASYNC_MESSAGE, CONTROL;
public static MessageType getMessageType(PaxosMessage m) {
if (m instanceof InlineRequest)
@@ -48,6 +48,8 @@ public static MessageType getMessageType(PaxosMessage m) {
return LEAD_CHANGE;
if (m instanceof AsyncMessage)
return ASYNC_MESSAGE;
+ if (m instanceof ControlMessage)
+ return CONTROL;
throw new IllegalArgumentException("Unknown message type: " + m);
}
}
View
4 src/main/java/com/yahoo/pasc/paxos/messages/PaxosMessage.java
@@ -62,7 +62,7 @@ private ChannelBuffer getSerializedMessage(Message m) {
// return copy;
}
- public long computeCRC(Message m, byte [] bytearray) {
+ private long computeCRC(byte [] bytearray) {
Checksum crc32 = CRC32Pool.getCRC32();
crc32.reset();
@@ -80,7 +80,7 @@ public long computeCRC(Message m, byte [] bytearray) {
private long computeCRC(Message m) {
ChannelBuffer bytecopy = getSerializedMessage(m);
byte [] bytearray = bytecopy.array();
- return computeCRC(m, bytearray);
+ return computeCRC(bytearray);
}
@Override
View
13 src/main/java/com/yahoo/pasc/paxos/messages/serialization/ManualDecoder.java
@@ -30,6 +30,7 @@
import com.yahoo.pasc.paxos.messages.Accept;
import com.yahoo.pasc.paxos.messages.Accepted;
import com.yahoo.pasc.paxos.messages.AsyncMessage;
+import com.yahoo.pasc.paxos.messages.ControlMessage;
import com.yahoo.pasc.paxos.messages.Digest;
import com.yahoo.pasc.paxos.messages.Hello;
import com.yahoo.pasc.paxos.messages.InlineRequest;
@@ -94,6 +95,9 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
}
if (result != crc) {
+ byte b = buf.readByte();
+ MessageType type = MessageType.values()[b];
+ LOG.error("Invalid CRC for {}. Expected {} Actual {}", new Object[] {type, crc, result});
throw new InputMessageException("Invalid CRC", null, null);
}
@@ -234,6 +238,15 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
am.setCRC(crc);
return am;
}
+ case CONTROL: {
+ int clientId = buf.readInt();
+ len = buf.readInt();
+ byte [] message = new byte [len];
+ buf.readBytes(message);
+ ControlMessage cm = new ControlMessage(clientId, message);
+ cm.setCRC(crc);
+ return cm;
+ }
}
buf.resetReaderIndex();
throw new IllegalArgumentException("Unknown message type " + b + " " + type);
View
12 src/main/java/com/yahoo/pasc/paxos/messages/serialization/ManualEncoder.java
@@ -31,6 +31,7 @@
import com.yahoo.pasc.paxos.messages.Accept;
import com.yahoo.pasc.paxos.messages.Accepted;
import com.yahoo.pasc.paxos.messages.AsyncMessage;
+import com.yahoo.pasc.paxos.messages.ControlMessage;
import com.yahoo.pasc.paxos.messages.Digest;
import com.yahoo.pasc.paxos.messages.Hello;
import com.yahoo.pasc.paxos.messages.MessageType;
@@ -122,6 +123,10 @@ public static int getSize(PaxosMessage msg, boolean encodeCRC) {
break;
case ASYNC_MESSAGE:
result += 20 + ((AsyncMessage) msg).getMessage().length;
+ break;
+ case CONTROL:
+ result += 8 + ((ControlMessage) msg).getControlMessage().length;
+ break;
}
return result;
}
@@ -258,6 +263,13 @@ public ChannelBuffer encode(PaxosMessage msg, boolean encodeCRC, int size, Chann
buffer.writeBytes(am.getMessage());
break;
}
+ case CONTROL: {
+ ControlMessage cm = (ControlMessage) msg;
+ buffer.writeInt(cm.getClientId());
+ buffer.writeInt(cm.getControlMessage().length);
+ buffer.writeBytes(cm.getControlMessage());
+ break;
+ }
default:
throw new UnsupportedOperationException("Unknown message type " + type);
}
View
11 src/main/java/com/yahoo/pasc/paxos/server/PaxosServer.java
@@ -67,7 +67,7 @@ public static void main(String[] args) throws SecurityException, NoSuchFieldExce
Option id = new Option("i", true, "client id");
Option port = new Option("p", true, "port used by server");
Option buffer = new Option("b", true, "number of batched messages");
- Option clients = new Option("c", true, "clients (hostname:port,...)");
+// Option clients = new Option("c", true, "clients (hostname:port,...)");
Option servers = new Option("s", true, "servers (hostname:port,...)");
Option maxInstances = new Option("m", true, "max number of instances");
Option anm = new Option("a", false, "protection against ANM faults");
@@ -83,7 +83,7 @@ public static void main(String[] args) throws SecurityException, NoSuchFieldExce
Option zookeeper = new Option("z", true, "zookeeper connection string");
options = new Options();
- options.addOption(id).addOption(port).addOption(buffer).addOption(clients).addOption(servers)
+ options.addOption(id).addOption(port).addOption(buffer).addOption(servers)
.addOption(threads).addOption(anm).addOption(udp).addOption(maxInstances) //.addOption(leader)
.addOption(cWindow).addOption(digests).addOption(ckPeriod).addOption(inlineThresh)
.addOption(twoStages).addOption(digestQuorum).addOption(leaderReplies).addOption(zookeeper);
@@ -94,7 +94,7 @@ public static void main(String[] args) throws SecurityException, NoSuchFieldExce
line = parser.parse(options, args);
String serverAddresses[] = line.hasOption('s') ? line.getOptionValue('s').split(",") : new String[] { "10.78.36.104:20548", "10.78.36.104:20748" };
- String clientAddresses[] = line.hasOption('c') ? line.getOptionValue('c').split(",") : new String[] { "localhost:9000" };
+// String clientAddresses[] = line.hasOption('c') ? line.getOptionValue('c').split(",") : new String[] { "localhost:9000" };
String zookeeper = line.hasOption('z') ? line.getOptionValue('z') : "localhost:2181";
int serverId = line.hasOption('i') ? Integer.parseInt(line.getOptionValue('i')) : 0;
int batchSize = line.hasOption('b') ? Integer.parseInt(line.getOptionValue('b')) : 1;
@@ -138,9 +138,10 @@ public static void main(String[] args) throws SecurityException, NoSuchFieldExce
runtime.addHandler(LeadershipChange.class, new LeadershipHandler());
if (udp) {
- new UdpServer(runtime, serverAddresses, clientAddresses, port, threads, serverId).run();
+ new UdpServer(runtime, serverAddresses, null, port, threads, serverId).run();
} else {
- new TcpServer(runtime, new EmptyStateMachine(), zookeeper, serverAddresses, clientAddresses, port, threads, serverId, twoStages).run();
+ new TcpServer(runtime, new EmptyStateMachine(), null, zookeeper, serverAddresses,
+ port, threads, serverId, twoStages).run();
}
} catch (Exception e) {
HelpFormatter formatter = new HelpFormatter();
View
17 src/main/java/com/yahoo/pasc/paxos/server/ServerHandler.java
@@ -35,14 +35,16 @@
import com.yahoo.pasc.Message;
import com.yahoo.pasc.PascRuntime;
import com.yahoo.pasc.paxos.messages.Accept;
+import com.yahoo.pasc.paxos.messages.ControlMessage;
import com.yahoo.pasc.paxos.messages.DigestToSM;
import com.yahoo.pasc.paxos.messages.Execute;
-import com.yahoo.pasc.paxos.messages.Hello;
import com.yahoo.pasc.paxos.messages.LeadershipChange;
import com.yahoo.pasc.paxos.messages.MessageType;
import com.yahoo.pasc.paxos.messages.PaxosMessage;
import com.yahoo.pasc.paxos.messages.PreReply;
+import com.yahoo.pasc.paxos.messages.Hello;
import com.yahoo.pasc.paxos.state.PaxosState;
+import com.yahoo.pasc.paxos.statemachine.ControlHandler;
import com.yahoo.pasc.paxos.statemachine.Response;
import com.yahoo.pasc.paxos.statemachine.StateMachine;
@@ -54,18 +56,21 @@
private ServerConnection serverConnection;
private boolean startedMonitor = false;
private StateMachine stateMachine;
+ private ControlHandler controlHandler;
- public ServerHandler(PascRuntime<PaxosState> runtime, StateMachine stateMachine, ServerConnection serverConnection) {
+ public ServerHandler(PascRuntime<PaxosState> runtime, StateMachine stateMachine, ControlHandler controlHandler,
+ ServerConnection serverConnection) {
this.runtime = runtime;
this.serverConnection = serverConnection;
this.stateMachine = stateMachine;
+ this.controlHandler = controlHandler;
}
@Override
public synchronized void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if (!startedMonitor) {
startedMonitor = true;
- new Thread(new ThroughputMonitor()).start();
+// new Thread(new ThroughputMonitor()).start();
}
}
@@ -82,6 +87,12 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
if (LOG.isTraceEnabled()) {
LOG.trace("Message received {}", message);
}
+ if (message instanceof ControlMessage) {
+ if (controlHandler != null) {
+ controlHandler.handleControlMessage((ControlMessage) message);
+ }
+ return;
+ }
MessageType type = MessageType.getMessageType(message);
long startTime = System.nanoTime();
if (!leadershipQueue.isEmpty()) {
View
8 src/main/java/com/yahoo/pasc/paxos/server/tcp/TcpServer.java
@@ -60,6 +60,7 @@
import com.yahoo.pasc.paxos.server.ServerConnection;
import com.yahoo.pasc.paxos.server.ServerHandler;
import com.yahoo.pasc.paxos.state.PaxosState;
+import com.yahoo.pasc.paxos.statemachine.ControlHandler;
import com.yahoo.pasc.paxos.statemachine.StateMachine;
public class TcpServer implements ServerConnection {
@@ -92,8 +93,9 @@
private Barrier barrier;
- public TcpServer(PascRuntime<PaxosState> runtime, StateMachine sm, String zk, String servers[], String clients[], int port,
- int threads, final int id, boolean twoStages) throws IOException, KeeperException {
+ public TcpServer(PascRuntime<PaxosState> runtime, StateMachine sm, ControlHandler controlHandler, String zk,
+ String servers[], int port, int threads, final int id, boolean twoStages)
+ throws IOException, KeeperException {
this.bossExecutor = Executors.newCachedThreadPool();
this.workerExecutor = Executors.newCachedThreadPool();
this.executionHandler = new ExecutionHandler(new MemoryAwareThreadPoolExecutor(1, 1024 * 1024,
@@ -112,7 +114,7 @@ public Thread newThread(Runnable r) {
return new Thread(r, id + "-" + count++);
}
}));
- this.channelHandler = new ServerHandler(runtime, sm, this);
+ this.channelHandler = new ServerHandler(runtime, sm, controlHandler, this);
this.channelPipelineFactory = new PipelineFactory(channelHandler, executionHandler, twoStages);
this.leaderElection = new LeaderElection(zk, id, this.channelHandler);
this.barrier = new Barrier(new ZooKeeper(zk, 2000, leaderElection), "/paxos_srv_barrier", "" + id, servers.length);
View
7 src/main/java/com/yahoo/pasc/paxos/statemachine/ControlHandler.java
@@ -0,0 +1,7 @@
+package com.yahoo.pasc.paxos.statemachine;
+
+import com.yahoo.pasc.paxos.messages.ControlMessage;
+
+public interface ControlHandler {
+ public void handleControlMessage(ControlMessage controlMessage);
+}
Please sign in to comment.
Something went wrong with that request. Please try again.