Skip to content
Browse files

set options in Message itself when passed through castMessage() or se…

…ndMessage() calls
  • Loading branch information...
1 parent c2fc992 commit 030fd2423b585734d2a11d7c60a016d0c5a5d9ff @belaban committed
View
19 src/org/jgroups/blocks/MessageDispatcher.java
@@ -293,6 +293,9 @@ protected GroupRequest cast(final Collection<Address> dests, Message msg, Reques
if(options != null) {
req.setResponseFilter(options.getRspFilter());
req.setAnycasting(options.getAnycasting());
+ msg.setFlag(options.getFlags());
+ if(options.getScope() > 0)
+ msg.setScope(options.getScope());
}
req.setBlockForResults(block_for_results);
@@ -320,6 +323,12 @@ public Object sendMessage(Message msg, RequestOptions opts) throws TimeoutExcept
return null;
}
+ if(opts != null) {
+ msg.setFlag(opts.getFlags());
+ if(opts.getScope() > 0)
+ msg.setScope(opts.getScope());
+ }
+
UnicastRequest req=new UnicastRequest(msg, corr, dest, opts);
try {
req.execute();
@@ -328,7 +337,7 @@ public Object sendMessage(Message msg, RequestOptions opts) throws TimeoutExcept
throw new RuntimeException("failed executing request " + req, t);
}
- if(opts.getMode() == ResponseMode.GET_NONE)
+ if(opts != null && opts.getMode() == ResponseMode.GET_NONE)
return null;
Rsp rsp=req.getResult();
@@ -348,11 +357,17 @@ public Object sendMessage(Message msg, RequestOptions opts) throws TimeoutExcept
return null;
}
+ if(options != null) {
+ msg.setFlag(options.getFlags());
+ if(options.getScope() > 0)
+ msg.setScope(options.getScope());
+ }
+
UnicastRequest<T> req=new UnicastRequest<T>(msg, corr, dest, options);
req.setBlockForResults(false);
try {
req.execute();
- if(options.getMode() == ResponseMode.GET_NONE)
+ if(options != null && options.getMode() == ResponseMode.GET_NONE)
return new NullFuture<T>(null);
return req;
}
View
120 tests/other/org/jgroups/tests/DatagramTest.java
@@ -1,120 +0,0 @@
-package org.jgroups.tests;
-
-import org.jgroups.util.Util;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-/**
- * Tests loss rate of UDP datagrams
- * @author Bela Ban
- */
-public class DatagramTest {
- DatagramSocket sock;
-
-
- private void start(boolean sender, InetAddress host, int port, int num_packets, int size, long sleep,
- int buffer_size) throws Exception {
- if(sender) {
- sock=new DatagramSocket();
- if(buffer_size > 0)
- sock.setSendBufferSize(buffer_size);
- System.out.println("local socket is " + sock.getLocalSocketAddress() + ", send buffer size=" + sock.getSendBufferSize());
- sendPackets(num_packets, size, host, port, sleep);
- }
- else {
- sock=new DatagramSocket(new InetSocketAddress(host, port));
- if(buffer_size > 0)
- sock.setReceiveBufferSize(buffer_size);
- System.out.println("receive buffer size=" + sock.getReceiveBufferSize());
- System.out.println("listening on " + sock.getLocalSocketAddress());
- loop();
- }
- }
-
- private void loop() throws IOException {
- byte[] buf=new byte[70000];
- DatagramPacket packet;
- int count=0;
- while(true) {
- packet=new DatagramPacket(buf, buf.length);
- sock.receive(packet);
- count++;
- System.out.print("received " + count + " packets\r");
- }
- }
-
- private void sendPackets(int num_packets, int size, InetAddress host, int port, long sleep) throws Exception {
- byte[] buf=new byte[size];
- DatagramPacket packet;
- int print=num_packets / 10;
- for(int i=0; i < num_packets; i++) {
- packet=new DatagramPacket(buf, buf.length, host, port);
- sock.send(packet);
- if(print == 0 || i % print == 0)
- System.out.print("sent " + i + " messages\r");
- if(sleep> 0) {
- Util.sleep(sleep);
- }
- }
- }
-
-
- public static void main(String[] args) throws Exception {
- boolean sender=false;
- InetAddress host=null;
- int port=5000;
- int num_packets=10000;
- int size=1000;
- long sleep=0;
- int buf=0;
-
-
- for(int i=0; i < args.length; i++) {
- if(args[i].equals("-sender")) {
- sender=true;
- continue;
- }
- if(args[i].equals("-host")) {
- host=InetAddress.getByName(args[++i]);
- continue;
- }
- if(args[i].equals("-port")) {
- port=Integer.parseInt(args[++i]);
- continue;
- }
- if(args[i].equals("-num_packets")) {
- num_packets=Integer.parseInt(args[++i]);
- continue;
- }
- if(args[i].equals("-size")) {
- size=Integer.parseInt(args[++i]);
- continue;
- }
- if(args[i].equals("-sleep")) {
- sleep=Long.parseLong(args[++i]);
- continue;
- }
- if(args[i].equals("-buf")) {
- buf=Integer.parseInt(args[++i]);
- continue;
- }
- help();
- return;
- }
-
- if(host == null)
- host=InetAddress.getByName("localhost");
- new DatagramTest().start(sender, host, port, num_packets, size, sleep, buf);
- }
-
-
- private static void help() {
- System.out.println("DatagramTest [-help] [-sender] [-host] [-port] [-num_packets <num>] [-size <size>] " +
- "[-sleep <ms>] [-buf <buf in bytes>]");
- }
-
-}
View
102 tests/other/org/jgroups/tests/JmxTest.java
@@ -1,102 +0,0 @@
-package org.jgroups.tests;
-
-import org.jgroups.JChannel;
-import org.jgroups.Message;
-import org.jgroups.View;
-import org.jgroups.jmx.JmxConfigurator;
-import org.jgroups.util.Util;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-/**
- * @author Bela Ban
- */
-public class JmxTest {
- MBeanServer server;
- JChannel channel;
- final String domain="JGroups";
-
-
-
-
- private boolean start(String props) throws Exception {
- server=Util.getMBeanServer();
- if(server == null) {
- System.err.println("No MBeanServers found;" +
- "\nJmxTest needs to be run with an MBeanServer present, or inside JDK 5");
- return false;
- }
- channel=new JChannel(props);
- channel.connect("DemoChannel");
- JmxConfigurator.registerChannel(channel, server, domain, channel.getClusterName() , true);
- return true;
- }
-
-
- void doWork() throws Exception {
- server=Util.getMBeanServer();
- if(server == null) {
- System.err.println("No MBeanServers found;" +
- "\nJmxTest needs to be run with an MBeanServer present, or inside JDK 5");
- return;
- }
- ObjectName channelName=new ObjectName("JGroups:channel=DemoChannel");
-
- // 1. get view and print it
- View v=(View)server.getAttribute(channelName, "View");
- System.out.println("view: " + v);
-
- // 2. send a bunch of messages
- System.out.println("sending some messages");
- Message msg;
- for(int i=0; i < 5; i++) {
- msg=new Message(null, null, "hello from " + i);
- server.invoke(channelName, "send", new Object[]{msg}, new String[]{msg.getClass().getName()});
- }
-
- Util.sleep(500);
-
- // 3. dump number of messages
- int numMsgs=((Integer)server.getAttribute(channelName, "NumMessages")).intValue();
- System.out.println("channel has " + numMsgs + " messages:");
-
- String queue=(String)server.invoke(channelName, "dumpQueue", null, null);
- System.out.println(queue);
-
- System.out.println("messages are:");
- Object obj;
- for(int i=0; i < numMsgs; i++) {
- obj=server.invoke(channelName, "receive", new Object[]{new Long(10)},
- new String[]{long.class.getName()});
- System.out.println("#" + i + ": " + obj);
- }
- }
-
- public static void main(String[] args) {
- String props=null;
-
- for(int i=0; i < args.length; i++) {
- if(args[i].equals("-props")) {
- props=args[++i];
- continue;
- }
- System.out.println("JmxTest [-props <props>]");
- }
-
- try {
- boolean rc=false;
- JmxTest test=new JmxTest();
- rc=test.start(props);
- if(rc == false)
- return;
- // test.doWork();
- while(true)
- Util.sleep(60000);
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
-
-}
View
14 tests/other/org/jgroups/tests/LargeState.java
@@ -32,7 +32,6 @@
public class LargeState extends ReceiverAdapter {
Channel channel;
byte[] state=null;
- Thread getter=null;
boolean rc=false;
String props;
long start, stop;
@@ -40,7 +39,6 @@
int size=100000;
int total_received=0;
final Promise state_promise=new Promise();
- static final int STREAMING_CHUNK_SIZE=10000;
public void start(boolean provider, int size, String props,boolean jmx) throws Exception {
@@ -59,27 +57,17 @@ public void start(boolean provider, int size, String props,boolean jmx) throws E
if(provider) {
this.size=size;
- // System.out.println("Creating state of " + size + " bytes");
- // state=createLargeState(size);
System.out.println("Waiting for other members to join and fetch large state");
-
-// System.out.println("sending a few messages");
-// for(int i=0; i < 100; i++) {
-// channel.send(null, null, "hello world " + i);
-// }
}
else {
System.out.println("Getting state");
start=System.currentTimeMillis();
- // total_received=0;
state_promise.reset();
rc=channel.getState(null, 0);
System.out.println("getState(), rc=" + rc);
if(rc)
state_promise.getResult(10000);
}
-
- // mainLoop();
if(!provider) {
channel.close();
}
@@ -91,7 +79,7 @@ public void start(boolean provider, int size, String props,boolean jmx) throws E
}
- byte[] createLargeState(int size) {
+ static byte[] createLargeState(int size) {
return new byte[size];
}
View
106 tests/other/org/jgroups/tests/LatencyTest.java
@@ -1,106 +0,0 @@
-package org.jgroups.tests;
-
-import org.jgroups.JChannel;
-import org.jgroups.Message;
-import org.jgroups.ReceiverAdapter;
-import org.jgroups.protocols.TP;
-import org.jgroups.stack.ProtocolStack;
-import org.jgroups.util.Util;
-
-/**
- * @author Bela Ban
- */
-public class LatencyTest {
- JChannel ch;
-
-
- private void start(boolean sender, boolean local, String props) throws Exception {
- if(local) {
- JChannel ch1, ch2;
- ch1=new JChannel(props);
- ch1.connect("x");
- ch2=new JChannel(props);
- ch2.setReceiver(new MyReceiver());
- ch2.connect("x");
- for(int i=0; i < 10; i++) {
- Message msg=new Message();
- msg.setFlag((byte)(Message.DONT_BUNDLE | Message.NO_FC));
- msg.setObject(System.nanoTime());
- ch1.send(msg);
- Util.sleep(1000);
- }
- ch2.close();
- ch1.close();
- return;
- }
-
- if(sender) {
- ch=new JChannel(props);
- disableBundling(ch);
- ch.connect("x");
- for(int i=0; i < 10; i++) {
- Message msg=new Message();
- msg.setFlag((byte)(Message.DONT_BUNDLE | Message.NO_FC));
- msg.setObject(System.nanoTime());
- ch.send(msg);
- Util.sleep(1000);
- }
- ch.close();
- }
- else {
- ch=new JChannel(props);
- disableBundling(ch);
- ch.setReceiver(new MyReceiver());
- ch.connect("x");
- System.out.println("receiver ready");
- while(true)
- Util.sleep(10000);
- }
- }
-
- private static void disableBundling(JChannel ch) {
- System.out.println("Disabling message bundling (as this would increase latency)");
- ProtocolStack stack=ch.getProtocolStack();
- TP transport=stack.getTransport();
- transport.setEnableBundling(false);
- }
-
-
- static class MyReceiver extends ReceiverAdapter {
-
- public void receive(Message msg) {
- Long timestamp=(Long)msg.getObject();
- long time=System.nanoTime() - timestamp.longValue();
- double time_ms=time / 1000.0 / 1000.0;
- System.out.println("time for message: " + Util.format(time_ms) + " ms");
- }
- }
-
- public static void main(String[] args) throws Exception {
- boolean sender=false, local=false;
- String props=null;
- for(int i=0; i < args.length; i++) {
- if(args[i].equalsIgnoreCase("-sender")) {
- sender=true;
- continue;
- }
- if(args[i].equalsIgnoreCase("-local")) {
- local=true;
- continue;
- }
- if(args[i].equalsIgnoreCase("-props")) {
- props=args[++i];
- continue;
- }
- help();
- return;
- }
- new LatencyTest().start(sender, local, props);
- }
-
-
- private static void help() {
- System.out.println("JGroupsLatencyTest [-sender] [-local] [-props <properties>]");
- }
-
-}
View
12 tests/other/org/jgroups/tests/McastDiscovery.java
@@ -25,7 +25,7 @@
int mcast_port = 5000;
long interval = 2000; // time between sends
McastSender mcast_sender = null;
- boolean running = true;
+ volatile boolean running = true;
HashMap map = new HashMap(); // keys=interface (InetAddress), values=List of receivers (InetAddress)
@@ -42,7 +42,7 @@ public void run() {
while (running) {
for (Iterator it = handlers.iterator(); it.hasNext();) {
handler = (MessageHandler) it.next();
- handler.sendDiscoveryRequest(ttl);
+ handler.sendDiscoveryRequest();
}
try {
sleep(interval);
@@ -135,7 +135,7 @@ void printValidInterfaces() {
}
}
- if (map.size() > 0)
+ if (!map.isEmpty())
System.out.println("\n-- Valid interfaces are " + map.keySet() + '\n');
else {
System.out.println("\nNo valid interfaces found, listing interfaces by number of responses/interface:\n" +
@@ -283,7 +283,7 @@ void stop() {
}
- void sendDiscoveryRequest(int ttl) {
+ void sendDiscoveryRequest() {
DiscoveryRequest req;
byte[] buf;
DatagramPacket packet;
@@ -354,10 +354,11 @@ static void help() {
abstract class DiscoveryPacket implements Serializable {
-
+ private static final long serialVersionUID=-2592954324310791792L;
}
class DiscoveryRequest extends DiscoveryPacket {
+ private static final long serialVersionUID=7587678128986493349L;
InetSocketAddress sender_addr = null;
DiscoveryRequest(InetAddress addr, int port) {
@@ -373,6 +374,7 @@ public String toString() {
class DiscoveryResponse extends DiscoveryPacket {
+ private static final long serialVersionUID=6862354518175504139L;
InetSocketAddress discovery_responder = null; // address of member who responds to discovery request
InetAddress interface_used = null;
View
3 tests/other/org/jgroups/tests/MessageDispatcherSpeedTest.java
@@ -67,10 +67,11 @@ void sendMessages(int num) throws Exception {
if(show <=0) show=1;
start=System.currentTimeMillis();
+ RequestOptions opts=new RequestOptions(ResponseMode.GET_ALL, TIMEOUT).setFlags(Message.DONT_BUNDLE).setFlags(Message.NO_FC);
System.out.println("-- sending " + num + " messages");
for(int i=1; i <= num; i++) {
- disp.castMessage(null, new Message(), new RequestOptions(ResponseMode.GET_ALL, TIMEOUT));
+ disp.castMessage(null, new Message(), opts);
if(i % show == 0)
System.out.println("-- sent " + i);
}

0 comments on commit 030fd24

Please sign in to comment.
Something went wrong with that request. Please try again.