Skip to content

Commit

Permalink
Big work dump. Restructured some servers, new RPC client design
Browse files Browse the repository at this point in the history
  • Loading branch information
drevell committed May 4, 2011
1 parent 481278f commit b0fc4b1
Show file tree
Hide file tree
Showing 46 changed files with 2,808 additions and 805 deletions.
17 changes: 15 additions & 2 deletions conf/conf.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,15 @@
CP=$MEGALON_BASE/conf:$MEGALON_BASE/conf/*:$MEGALON_BASE/lib/*:$MEGALON_BASE/bin/classes:$CLASSPATH
PROPS="-Dmegalon.logfile=$MEGALON_BASE/log/megalon.log -Dlog4j.configuration=log4j.properties"
if [ "$MEGALON_CONF_DIR" == "" ] ; then
MEGALON_CONF_DIR="$MEGALON_BASE/conf"
fi

if [ "$MEGALON_LOG_FILE" == "" ] ; then
MEGALON_LOG_FILE="$MEGALON_BASE/log/megalon.log"
fi

if [ "$MEGALON_CONSOLE_LOG_FILE" == "" ] ; then
# Stdout and stderr will be redirected here
MEGALON_CONSOLE_LOG_FILE="$MEGALON_BASE/log/console.log"
fi

CP=$MEGALON_CONF_DIR:$MEGALON_CONF_DIR/*:$MEGALON_BASE/lib/*:$MEGALON_BASE/bin/classes:$CLASSPATH
PROPS="-Dmegalon.logfile=$MEGALON_LOG_FILE -Dlog4j.configuration=log4j.properties -Dmegalon.confdir=$MEGALON_CONF_DIR"
11 changes: 7 additions & 4 deletions conf/megalon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ global:
zk: ["holmes:35793"]

- name: east
hbase: ["holmes:9210"]
hbase: ["meg1:9210"]
coord: ["meg1:35791"]
replsrv: ["meg1:35791"]
replsrv: ["meg1:35792"]
zk: ["meg1:35793"]

- name: north
hbase: ["meg2:9210"]
coord: ["meg2:35791"]
replsrv: ["meg2:35792"]
zk: ["meg2:35793"]

#- name: north
#- name: south

6 changes: 5 additions & 1 deletion conf/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ node:

# Should this node run the replication server component?
run_replsrv: true
repsrv_listen: true
replsrv_listen: true
replsrv_port: 35792

# Should this node run the coordinator component?
run_coord: true
coord_listen: true
coord_port: 35791

# Should this node be a client?
run_client: true

5 changes: 3 additions & 2 deletions run_main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

MEGALON_BASE=`dirname $0`
source $MEGALON_BASE/conf/conf.sh

java -ea -cp $CP $PROPS org.megalon.Main $1 $2 $3 $4 $5
cd $MEGALON_BASE
echo "Running in `pwd`"
java -ea -cp $CP $PROPS org.megalon.Megalon $1 $2 $3 $4 $5 <&- >> $MEGALON_CONSOLE_LOG_FILE 2>&1 &
4 changes: 0 additions & 4 deletions run_preparetest.sh

This file was deleted.

Binary file removed src/avro/.megalon.avpr.swp
Binary file not shown.
11 changes: 6 additions & 5 deletions src/avro/megalon.avpr
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{
"name": "AvroWALEntryStatus",
"type": "enum",
"symbols": ["PREPARED", "ACCEPTED", "CHOSEN"]
"symbols": ["NEW", "PREPARED", "ACCEPTED", "CHOSEN"]
},
{
"name": "AvroAckOrNack",
Expand Down Expand Up @@ -41,7 +41,7 @@
"type": "record",
"fields": [
{"name": "n", "type": "long"},
{"name": "reqSerial", "type": "long"},
//{"name": "reqSerial", "type": "long"},
{"name": "walIndex", "type": "long"}
]
},
Expand All @@ -50,23 +50,24 @@
"type": "record",
"fields": [
{"name": "walEntry", "type": ["null", "AvroWALEntry"]},
{"name": "reqSerial", "type": "long"}
//{"name": "reqSerial", "type": "long"},
{"name": "hadQuorum", "type": "boolean"}
]
},
{
"name": "AvroAccept",
"type": "record",
"fields": [
{"name": "walEntry", "type": "AvroWALEntry"},
{"name": "reqSerial", "type": "long"},
//{"name": "reqSerial", "type": "long"},
{"name": "walIndex", "type": "long"}
]
},
{
"name": "AvroAcceptResponse",
"type": "record",
"fields": [
{"name": "reqSerial", "type": "long"},
//{"name": "reqSerial", "type": "long"},
//{"name": "ackOrNack", "type": "AvroAckOrNack"}
{"name": "acked", "type": "boolean"}
]
Expand Down
171 changes: 171 additions & 0 deletions src/java/org/megalon/AvroDecodeStage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package org.megalon;

import java.io.IOException;

import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.megalon.avro.AvroAccept;
import org.megalon.avro.AvroPrepare;
import org.megalon.messages.MegalonMsg;
import org.megalon.messages.MsgAccept;
import org.megalon.messages.MsgPrepare;
import org.megalon.multistageserver.BBInputStream;
import org.megalon.multistageserver.MultiStageServer;
import org.megalon.multistageserver.MultiStageServer.Finisher;
import org.megalon.multistageserver.MultiStageServer.NextAction;
import org.megalon.multistageserver.MultiStageServer.NextAction.Action;
import org.megalon.multistageserver.MultiStageServer.Stage;

/**
* This stage does two things:
* 1. Sockets that just received an RPC request from a remote node
* will be sent here to have the request decoded from Avro into a Java
* object, which will be sent on to the ReplServer execution stage.
* 2. Request payloads that arrived via a socket that have finished
* processing by the ReplServer execution stage will be sent to this
* stage to have the reply encoded into Avro. The encoded payload will
* then be sent to the socketServer to send the encoded response back
* to the client.
*/
class AvroDecodeStage implements MultiStageServer.Stage<MSocketPayload>,
Finisher<MReplPayload> {
public static final int MAX_MSG_LEN = 10000000;

MultiStageServer<MSocketPayload> myServer;
Stage<MSocketPayload> myServerNextStage;
MultiStageServer<MReplPayload> enqueueServer;
Stage<MReplPayload> enqueueStage;
Stage<MSocketPayload> selectorStage;

Log logger = LogFactory.getLog(AvroDecodeStage.class);

/**
* No-arg constructor, the caller must call init() before using this object.
*/
public AvroDecodeStage() {}

public AvroDecodeStage(Stage<MSocketPayload> myServerNextStage,
MultiStageServer<MReplPayload> enqueueServer,
Stage<MReplPayload> entryStage,
Stage<MSocketPayload> selectorStage) {
this.init(myServerNextStage, enqueueServer, entryStage, selectorStage);
}

public void init(Stage<MSocketPayload> myServerNextStage,
MultiStageServer<MReplPayload> enqueueServer,
Stage<MReplPayload> entryStage,
Stage<MSocketPayload> selectorStage) {
this.enqueueServer = enqueueServer;
this.myServerNextStage = myServerNextStage;
this.enqueueStage = entryStage;
this.selectorStage = selectorStage;
}

final DatumReader<AvroPrepare> prepareReader =
new SpecificDatumReader<AvroPrepare>(AvroPrepare.class);
final DatumReader<AvroAccept> acceptReader =
new SpecificDatumReader<AvroAccept>(AvroAccept.class);

/**
* This finisher will handle payloads when the main replication server is
* done with them.
*/
public void finish(MReplPayload mPayload) {
MSocketPayload mSockPayload = (MSocketPayload)mPayload.getOuterPayload();
mSockPayload.resp = mPayload.resp;
mSockPayload.reqType = mPayload.msgType;
myServer.enqueue(mSockPayload, myServerNextStage);
}

public NextAction<MSocketPayload> runStage(MSocketPayload payload)
throws Exception {
// TODO it would be nice if we didn't use Avro directly here. It
// would be better to have some general system where any
// serialization system could be plugged in.

logger.debug("In AvroDecodeStage.runStage");
logger.debug("Incoming buffers: " + RPCUtil.strBufs(payload.readBufs));
while(true) {
try {
if(!RPCUtil.hasCompleteMessage(payload.readBufs)) {
logger.debug("Don't have complete msg, back to selector");
return new NextAction<MSocketPayload>(Action.FORWARD,
selectorStage);
}
} catch (IOException e) {
logger.warn("Misformatted message", e);
return new NextAction<MSocketPayload>(Action.FINISHED, null);
}
logger.debug("Have >0 complete messages");
BBInputStream is = payload.is;

// Read the incoming msg length prefix, and sanity check it
int msgLen = RPCUtil.readInt(is);
logger.debug("Incoming msgLen " + msgLen);
int minReqdBytes = RPCUtil.RPC_HEADER_SIZE-4;
if(msgLen < minReqdBytes) {
logger.warn("Message was too short to contain " +
"required fields, need " + minReqdBytes);
return new NextAction<MSocketPayload>(Action.FINISHED, null);
}
if(msgLen > MAX_MSG_LEN) {
// The incoming message claims to be very large. It's
// probably just misformatted, or the wrong protocol.
logger.warn("ReplServer msg claimed to be huge. Closing.");
return new NextAction<MSocketPayload>(Action.FINISHED, null);
}
assert is.available() >= msgLen;

long reqSerial = RPCUtil.readLong(is);

byte[] msgType = new byte[1];
is.read(msgType);
Decoder dec =
DecoderFactory.get().binaryDecoder(payload.is, null);
MReplPayload newPayload = new MReplPayload(msgType[0], payload);
newPayload.reqSerial = reqSerial;

switch(msgType[0]) {
case MegalonMsg.MSG_PREPARE:
// TODO reuse avro obj here?
AvroPrepare avroPrepare = prepareReader.read(null, dec);
newPayload.req = new MsgPrepare(avroPrepare);
logger.debug("DecodeStage Enqueueing prepare into ReplServer");
enqueueServer.enqueue(newPayload, enqueueStage, this);
logger.debug("AvroDecode has a \"prepare\"");
return new NextAction<MSocketPayload>(Action.IGNORE, null);
case MegalonMsg.MSG_ACCEPT:
AvroAccept avroAccept = acceptReader.read(null, dec);
newPayload.req = new MsgAccept(avroAccept);
logger.debug("DecodeStage Enqueueing accept into ReplServer");
enqueueServer.enqueue(newPayload, enqueueStage, this);
return new NextAction<MSocketPayload>(Action.IGNORE, null);
default:
logger.warn("Repl server saw unexpected message type: " +
msgType[0]);
return new NextAction<MSocketPayload>(Action.FINISHED, null);

}
}
}

public int getNumConcurrent() {
return 10; // TODO configurable
}

public String getName() {
return this.getClass().getName();
}

public int getBacklogSize() {
return 100; // TODO configurable
}

public void setServer(MultiStageServer<MSocketPayload> server) {
this.myServer = server;
}
}
Loading

0 comments on commit b0fc4b1

Please sign in to comment.