Skip to content

Commit

Permalink
added config 'state transfer' to joining member
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Dec 8, 2011
1 parent d7f3c3c commit bf5dd39
Showing 1 changed file with 134 additions and 24 deletions.
158 changes: 134 additions & 24 deletions tests/perf/org/jgroups/tests/perf/MPerf.java
Expand Up @@ -4,6 +4,7 @@
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Streamable;
import org.jgroups.util.Util;

Expand Down Expand Up @@ -36,10 +37,12 @@ public class MPerf extends ReceiverAdapter {
protected int num_threads=1;
protected int log_interval=num_msgs / 10; // log every 10%


/** Maintains stats per sender, will be sent to perf originator when all messages have been received */
protected final Map<Address,MemberInfo> senders=Util.createConcurrentMap();
protected final List<Address> members=new ArrayList<Address>();
protected final Log log=LogFactory.getLog(getClass());
protected boolean looping=true;

/** Map<Object, MemberInfo>. A hashmap of senders, each value is the 'senders' hashmap */
protected final Map<Object,MemberInfo> results=Util.createConcurrentMap();
Expand Down Expand Up @@ -81,23 +84,29 @@ public void start(String props, String name) throws Exception {
channel.setReceiver(this);
channel.connect("mperf");
local_addr=channel.getAddress();

// send a CONFIG_REQ to the current coordinator, so we can get the current config
Address coord=channel.getView().getMembers().get(0);
if(coord != null && !local_addr.equals(coord))
sendConfigRequest(coord);
}


protected void loop() throws Exception {
protected void loop() {
int c;

final String INPUT="[1] Send [2] View\n" +
"[3] Set num msgs (%d) [4] Set warmup (%d) [5] Set msg size (%s) [6] Set threads (%d)\n" +
"[x] Exit this [X] Exit all ";

while(true) {
while(looping) {
try {
c=Util.keyPress(String.format(INPUT, num_msgs,warmup, Util.printBytes(msg_size), num_threads));
switch(c) {
case '1':
break;
case '2':
System.out.println("view: " + channel.getView() + " (local address=" + channel.getAddress() + ")");
break;
case '3':
configChange("num_msgs");
Expand All @@ -111,15 +120,19 @@ protected void loop() throws Exception {
case '6':
configChange("num_threads");
break;

case 'x':
return;
looping=false;
break;
case 'X':
sendExit();
break;
}
}
catch(Throwable t) {
System.err.println(t);
}
}
stop();
}

protected void configChange(String name) throws Exception {
Expand All @@ -131,6 +144,26 @@ protected void configChange(String name) throws Exception {
channel.send(msg);
}

protected void sendExit() throws Exception {
Message msg=new Message();
msg.putHeader(ID, new MPerfHeader(MPerfHeader.EXIT));
channel.send(msg);
}

// Send a CONFIG_REQ to the current coordinator
protected void sendConfigRequest(Address coord) throws Exception {
Message msg=new Message(coord);
msg.setFlag(Message.Flag.RSVP);
msg.putHeader(ID, new MPerfHeader(MPerfHeader.CONFIG_REQ));
channel.send(msg);
}

protected void sendConfigurationResponse(Address target, Configuration cfg) throws Exception {
Message msg=new Message(target, null, cfg);
msg.putHeader(ID, new MPerfHeader(MPerfHeader.CONFIG_RSP));
channel.send(msg);
}


protected void output(Object msg) {
if(this.output != null) {
Expand All @@ -154,6 +187,7 @@ private static String printProperties() {
}

public void stop() {
looping=false;
Util.close(channel);
if(this.output != null) {
try {
Expand All @@ -173,26 +207,67 @@ public void receive(Message msg) {

case MPerfHeader.CONFIG_CHANGE:
ConfigChange config_change=(ConfigChange)msg.getObject();
String attr_name=config_change.attr_name;
handleConfigChange(config_change);
break;

case MPerfHeader.CONFIG_REQ:
try {
Object attr_value=config_change.getValue();
Field field=Util.getField(this.getClass(),attr_name);
Util.setField(field,this,attr_value);
handleConfigRequest(msg.getSrc());
}
catch(Exception e) {
System.err.println("failed applying config change for attr " + attr_name + ": " + e);
e.printStackTrace();
}
break;

case MPerfHeader.CONFIG_RSP:
handleConfigResponse((Configuration)msg.getObject());
break;

case MPerfHeader.EXIT:
ProtocolStack stack=channel.getProtocolStack();
String cluster_name=channel.getClusterName();
stack.stopStack(cluster_name);
stack.destroy();
break;

default:
System.err.println("Header type " + hdr.type + " not recognized");
}
}

protected void handleConfigChange(ConfigChange config_change) {
String attr_name=config_change.attr_name;
try {
Object attr_value=config_change.getValue();
Field field=Util.getField(this.getClass(), attr_name);
Util.setField(field, this, attr_value);
System.out.println(config_change.attr_name + "=" + attr_value);
log_interval=num_msgs / 10;
}
catch(Exception e) {
System.err.println("failed applying config change for attr " + attr_name + ": " + e);
}
}

protected void handleConfigRequest(Address sender) throws Exception {
Configuration cfg=new Configuration();
cfg.addChange("num_msgs", num_msgs);
cfg.addChange("warmup", warmup);
cfg.addChange("msg_size", msg_size);
cfg.addChange("num_threads", num_threads);
sendConfigurationResponse(sender, cfg);
}

protected void handleConfigResponse(Configuration cfg) {
for(ConfigChange change: cfg.changes) {
handleConfigChange(change);
}
}


public void viewAccepted(View view) {
System.out.println("** " + view);
}

protected class Sender extends Thread {
CyclicBarrier barrier;
Expand Down Expand Up @@ -371,15 +446,49 @@ public int size() {
public void writeTo(DataOutput out) throws Exception {
Util.writeString(attr_name, out);
Util.writeByteBuffer(attr_value, out);

}

public void readFrom(DataInput in) throws Exception {
attr_name=Util.readString(in);
attr_value=Util.readByteBuffer(in);
}
}


protected static class Configuration implements Streamable {
protected List<ConfigChange> changes=new ArrayList<ConfigChange>();

public Configuration() {
}

public Configuration addChange(String key, Object val) throws Exception {
if(key != null && val != null) {
changes.add(new ConfigChange(key, val));
}
return this;
}

public int size() {
int retval=Global.INT_SIZE;
for(ConfigChange change: changes)
retval+=change.size();
return retval;
}

public void writeTo(DataOutput out) throws Exception {
out.writeInt(changes.size());
for(ConfigChange change: changes)
change.writeTo(out);
}

public void readFrom(DataInput in) throws Exception {
int len=in.readInt();
for(int i=0; i < len; i++) {
ConfigChange change=new ConfigChange();
change.readFrom(in);
changes.add(change);
}
}
}

protected static class MPerfHeader extends Header {
Expand All @@ -391,25 +500,19 @@ protected static class MPerfHeader extends Header {
protected static final byte CONFIG_CHANGE = 6;
protected static final byte CONFIG_REQ = 7;
protected static final byte CONFIG_RSP = 8;
protected static final byte EXIT = 9;

protected byte type;


public MPerfHeader() {
}

public MPerfHeader(byte type) {
this.type=type;
}

public MPerfHeader() {}
public MPerfHeader(byte type) {this.type=type;}
public int size() {return Global.BYTE_SIZE;}
public void writeTo(DataOutput out) throws Exception {out.writeByte(type);}
public void readFrom(DataInput in) throws Exception {type=in.readByte();}
}


public static void main(String[] args) {
MPerf test=null;
String props=null, name=null;

for(int i=0; i < args.length; i++) {
Expand All @@ -425,17 +528,24 @@ public static void main(String[] args) {
return;
}

test=new MPerf();
final MPerf test=new MPerf();
try {
test.start(props,name);
test.loop();

// this kludge is needed in order to terminate the program gracefully when 'X' is pressed
// (otherwise System.in.read() would not terminate)
Thread thread=new Thread() {
public void run() {
test.loop();
}
};
thread.setDaemon(true);
thread.start();

}
catch(Exception e) {
e.printStackTrace();
}
finally {
test.stop();
}
}

}

0 comments on commit bf5dd39

Please sign in to comment.