Skip to content

Commit

Permalink
- TP: moved version checking from handleSingleMessage() and handleMes…
Browse files Browse the repository at this point in the history
…sageBatch() into receive()

- TP implements both receive(InputStream) and receive(byte[]) [https://issues.jboss.org/browse/JGRP-2165]
- BaseServer discards length bytes if no receiver callback has been installed
- TcpConnection now doesn't create intermediate buffer, but passes the input stream directly to the receive() callback
- GossipRouter/RouterStub now implement both receive() callbacks
- PubClient/PubServer now implement both receive() callbacks
- RoundTrip.receive() never used correct offsets to read data
  • Loading branch information
belaban committed Mar 29, 2017
1 parent 5c9b043 commit 044dcbb
Show file tree
Hide file tree
Showing 16 changed files with 340 additions and 140 deletions.
10 changes: 10 additions & 0 deletions src/org/jgroups/blocks/cs/BaseServer.java
Expand Up @@ -12,6 +12,7 @@
import org.jgroups.util.*; import org.jgroups.util.*;


import java.io.Closeable; import java.io.Closeable;
import java.io.DataInput;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
Expand Down Expand Up @@ -165,6 +166,15 @@ public void receive(Address sender, ByteBuffer buf) {
this.receiver.receive(sender, buf); this.receiver.receive(sender, buf);
} }


public void receive(Address sender, DataInput in, int len) throws Exception {
if(this.receiver != null)
this.receiver.receive(sender, in);
else {
// discard len bytes (in.skip() is not guaranteed to discard *all* len bytes)
byte[] buf=new byte[len];
in.readFully(buf, 0, len);
}
}




public void send(Address dest, byte[] data, int offset, int length) throws Exception { public void send(Address dest, byte[] data, int offset, int length) throws Exception {
Expand Down
3 changes: 3 additions & 0 deletions src/org/jgroups/blocks/cs/Receiver.java
Expand Up @@ -2,6 +2,7 @@


import org.jgroups.Address; import org.jgroups.Address;


import java.io.DataInput;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


/** /**
Expand Down Expand Up @@ -31,4 +32,6 @@ public interface Receiver {
* Note that buf could be a direct ByteBuffer. * Note that buf could be a direct ByteBuffer.
*/ */
void receive(Address sender, ByteBuffer buf); // should be a default method in Java 8 void receive(Address sender, ByteBuffer buf); // should be a default method in Java 8

void receive(Address sender, DataInput in) throws Exception;
} }
3 changes: 3 additions & 0 deletions src/org/jgroups/blocks/cs/ReceiverAdapter.java
Expand Up @@ -3,6 +3,7 @@
import org.jgroups.Address; import org.jgroups.Address;
import org.jgroups.util.Util; import org.jgroups.util.Util;


import java.io.DataInput;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


/** /**
Expand All @@ -25,5 +26,7 @@ public void receive(Address sender, ByteBuffer buf) {
Util.bufferToArray(sender, buf, this); Util.bufferToArray(sender, buf, this);
} }


public void receive(Address sender, DataInput in) throws Exception {


}
} }
7 changes: 2 additions & 5 deletions src/org/jgroups/blocks/cs/TcpConnection.java
Expand Up @@ -286,12 +286,9 @@ public void run() {
Throwable t=null; Throwable t=null;
while(canRun()) { while(canRun()) {
try { try {
int len=in.readInt(); int len=in.readInt(); // needed to read messages from TCP_NIO2
if(buffer == null || buffer.length < len) server.receive(peer_addr, in, len);
buffer=new byte[len];
in.readFully(buffer, 0, len);
updateLastAccessed(); updateLastAccessed();
server.receive(peer_addr, buffer, 0, len);
} }
catch(OutOfMemoryError mem_ex) { catch(OutOfMemoryError mem_ex) {
t=mem_ex; t=mem_ex;
Expand Down
31 changes: 25 additions & 6 deletions src/org/jgroups/demos/PubClient.java
@@ -1,10 +1,13 @@
package org.jgroups.demos; package org.jgroups.demos;


import org.jgroups.Address; import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.blocks.cs.*; import org.jgroups.blocks.cs.*;
import org.jgroups.util.Bits;
import org.jgroups.util.Util; import org.jgroups.util.Util;


import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
Expand All @@ -28,16 +31,26 @@ public PubClient(String name) {


@Override @Override
public void receive(Address sender, ByteBuffer buf) { public void receive(Address sender, ByteBuffer buf) {
String msg=new String(buf.array(), buf.arrayOffset(), buf.limit()); byte[] buffer=buf.array();
int len=Bits.readInt(buffer, buf.arrayOffset());
String msg=new String(buffer, buf.arrayOffset()+Global.INT_SIZE, len);
System.out.printf("-- %s\n", msg); System.out.printf("-- %s\n", msg);
} }


@Override @Override
public void receive(Address sender, byte[] buf, int offset, int length) { public void receive(Address sender, byte[] buf, int offset, int length) {
String msg=new String(buf, offset, length); int len=Bits.readInt(buf, offset);
String msg=new String(buf, offset+Global.INT_SIZE, len);
System.out.printf("-- %s\n", msg); System.out.printf("-- %s\n", msg);
} }


@Override public void receive(Address sender, DataInput in) throws Exception {
int len=in.readInt();
byte[] buf=new byte[len];
in.readFully(buf);
String msg=new String(buf, 0, buf.length);
System.out.printf("-- %s\n", msg);
}


@Override @Override
public void connectionClosed(Connection conn, String cause) { public void connectionClosed(Connection conn, String cause) {
Expand All @@ -58,8 +71,7 @@ protected void start(InetAddress srv_addr, int srv_port, boolean nio) throws Exc
client.receiver(this); client.receiver(this);
client.addConnectionListener(this); client.addConnectionListener(this);
client.start(); client.start();
byte[] buf=String.format("%s joined\n", name).getBytes(); send(String.format("%s joined", name));
((Client)client).send(buf, 0, buf.length);
eventLoop(); eventLoop();
client.stop(); client.stop();
} }
Expand All @@ -75,8 +87,7 @@ protected void eventLoop() {
if(line.startsWith("quit") || line.startsWith("exit")) { if(line.startsWith("quit") || line.startsWith("exit")) {
break; break;
} }
byte[] buf=String.format("%s: %s\n", name, line).getBytes(); send(String.format("%s: %s", name, line));
((Client)client).send(buf, 0, buf.length);
} }
catch(Exception e) { catch(Exception e) {
e.printStackTrace(); e.printStackTrace();
Expand All @@ -85,6 +96,14 @@ protected void eventLoop() {
} }
} }


protected void send(String str) throws Exception {
byte[] buf=str.getBytes();
byte[] data=new byte[Global.INT_SIZE + buf.length];
Bits.writeInt(buf.length, data, 0);
System.arraycopy(buf, 0, data, Global.INT_SIZE, buf.length);
((Client)client).send(data, 0, data.length);
}

public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
InetAddress server_addr=InetAddress.getLocalHost(); InetAddress server_addr=InetAddress.getLocalHost();
int server_port=7500; int server_port=7500;
Expand Down
11 changes: 11 additions & 0 deletions src/org/jgroups/demos/PubServer.java
@@ -1,13 +1,16 @@
package org.jgroups.demos; package org.jgroups.demos;


import org.jgroups.Address; import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.blocks.cs.*; import org.jgroups.blocks.cs.*;
import org.jgroups.jmx.JmxConfigurator; import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.logging.Log; import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory; import org.jgroups.logging.LogFactory;
import org.jgroups.stack.IpAddress; import org.jgroups.stack.IpAddress;
import org.jgroups.util.Bits;
import org.jgroups.util.Util; import org.jgroups.util.Util;


import java.io.DataInput;
import java.net.InetAddress; import java.net.InetAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


Expand Down Expand Up @@ -52,6 +55,14 @@ public void receive(Address sender, byte[] buf, int offset, int length) {
} }
} }


public void receive(Address sender, DataInput in) throws Exception {
int len=in.readInt();
byte[] buf=new byte[len + Global.INT_SIZE];
Bits.writeInt(len, buf, 0);
in.readFully(buf, Global.INT_SIZE, len);
server.send(null, buf, 0, buf.length);
}

public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
int port=7500; int port=7500;
InetAddress bind_addr=null; InetAddress bind_addr=null;
Expand Down
7 changes: 0 additions & 7 deletions src/org/jgroups/protocols/BasicTCP.java
Expand Up @@ -77,8 +77,6 @@ protected BasicTCP() {
public void setReaperInterval(long interval) {this.reaper_interval=interval;} public void setReaperInterval(long interval) {this.reaper_interval=interval;}
public long getConnExpireTime() {return conn_expire_time;} public long getConnExpireTime() {return conn_expire_time;}
public void setConnExpireTime(long time) {this.conn_expire_time=time;} public void setConnExpireTime(long time) {this.conn_expire_time=time;}
// public boolean getReuseAddress() {return this.reuse_addr;}
// public BasicTCP setReuseAddress(boolean b) {this.reuse_addr=b; return this;}




public void init() throws Exception { public void init() throws Exception {
Expand Down Expand Up @@ -123,11 +121,6 @@ public String getInfo() {


public abstract void retainAll(Collection<Address> members); public abstract void retainAll(Collection<Address> members);


/** BaseServer.Receiver interface */
public void receive(Address sender, byte[] data, int offset, int length) {
super.receive(sender, data, offset, length);
}

public void receive(Address sender, ByteBuffer buf) { public void receive(Address sender, ByteBuffer buf) {
Util.bufferToArray(sender, buf, this); Util.bufferToArray(sender, buf, this);
} }
Expand Down
71 changes: 41 additions & 30 deletions src/org/jgroups/protocols/TP.java
Expand Up @@ -14,6 +14,7 @@
import org.jgroups.util.ThreadFactory; import org.jgroups.util.ThreadFactory;
import org.jgroups.util.UUID; import org.jgroups.util.UUID;


import java.io.DataInput;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.net.InetAddress; import java.net.InetAddress;
Expand Down Expand Up @@ -1240,26 +1241,43 @@ public void receive(Address sender, byte[] data, int offset, int length) {
if(Objects.equals(local_physical_addr, sender)) if(Objects.equals(local_physical_addr, sender))
return; return;


byte flags=data[Global.SHORT_SIZE]; short version=Bits.readShort(data, offset);
boolean is_message_list=(flags & LIST) == LIST; if(!versionMatch(version, sender))
return;
offset+=Global.SHORT_SIZE;
byte flags=data[offset];
offset+=Global.BYTE_SIZE;


boolean is_message_list=(flags & LIST) == LIST, multicast=(flags & MULTICAST) == MULTICAST;
ByteArrayDataInputStream in=new ByteArrayDataInputStream(data, offset, length);
if(is_message_list) // used if message bundling is enabled if(is_message_list) // used if message bundling is enabled
handleMessageBatch(sender, data, offset, length); handleMessageBatch(in, multicast);
else else
handleSingleMessage(sender, data, offset, length); handleSingleMessage(in, multicast);
} }


public void receive(Address sender, DataInput in) throws Exception {
if(in == null) return;


protected void handleMessageBatch(Address sender, byte[] data, int offset, int length) { // drop message from self; it has already been looped back up (https://issues.jboss.org/browse/JGRP-1765)
try { if(Objects.equals(local_physical_addr, sender))
ByteArrayDataInputStream in=new ByteArrayDataInputStream(data, offset, length); return;
short version=in.readShort();
if(!versionMatch(version, sender)) short version=in.readShort();
return; if(!versionMatch(version, sender))
return;
byte flags=in.readByte();

boolean is_message_list=(flags & LIST) == LIST, multicast=(flags & MULTICAST) == MULTICAST;
if(is_message_list) // used if message bundling is enabled
handleMessageBatch(in, multicast);
else
handleSingleMessage(in, multicast);
}


byte flags=in.readByte();
final boolean multicast=(flags & MULTICAST) == MULTICAST;


protected void handleMessageBatch(DataInput in, boolean multicast) {
try {
final MessageBatch[] batches=Util.readMessageBatch(in, multicast); final MessageBatch[] batches=Util.readMessageBatch(in, multicast);
final MessageBatch batch=batches[0], oob_batch=batches[1], internal_batch_oob=batches[2], internal_batch=batches[3]; final MessageBatch batch=batches[0], oob_batch=batches[1], internal_batch_oob=batches[2], internal_batch=batches[3];


Expand All @@ -1274,25 +1292,8 @@ protected void handleMessageBatch(Address sender, byte[] data, int offset, int l
} }




protected void processBatch(MessageBatch batch, boolean oob, boolean internal) { protected void handleSingleMessage(DataInput in, boolean multicast) {
try { try {
if(batch != null && !batch.isEmpty())
msg_processing_policy.process(batch, oob, internal);
}
catch(Throwable t) {
log.error("processing batch failed", t);
}
}

protected void handleSingleMessage(Address sender, byte[] data, int offset, int length) {
try {
ByteArrayDataInputStream in=new ByteArrayDataInputStream(data, offset, length);
short version=in.readShort();
if(!versionMatch(version, sender))
return;

byte flags=in.readByte();
final boolean multicast=(flags & MULTICAST) == MULTICAST;
Message msg=new Message(false); // don't create headers, readFrom() will do this Message msg=new Message(false); // don't create headers, readFrom() will do this
msg.readFrom(in); msg.readFrom(in);


Expand All @@ -1307,6 +1308,16 @@ protected void handleSingleMessage(Address sender, byte[] data, int offset, int
} }
} }


protected void processBatch(MessageBatch batch, boolean oob, boolean internal) {
try {
if(batch != null && !batch.isEmpty())
msg_processing_policy.process(batch, oob, internal);
}
catch(Throwable t) {
log.error("processing batch failed", t);
}
}

public boolean unicastDestMismatch(Address dest) { public boolean unicastDestMismatch(Address dest) {
return dest != null && !(Objects.equals(dest, local_addr) || Objects.equals(dest, local_physical_addr)); return dest != null && !(Objects.equals(dest, local_addr) || Objects.equals(dest, local_physical_addr));
} }
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/TUNNEL.java
Expand Up @@ -179,9 +179,9 @@ public Object handleDownEvent(Event evt) {
public void receive(GossipData data) { public void receive(GossipData data) {
switch (data.getType()) { switch (data.getType()) {
case MESSAGE: case MESSAGE:
byte[] msg=data.getBuffer();
if(Objects.equals(local_addr, data.getSender())) if(Objects.equals(local_addr, data.getSender()))
return; return;
byte[] msg=data.getBuffer();
receive(data.getSender(), msg, 0, msg.length); receive(data.getSender(), msg, 0, msg.length);
break; break;
case SUSPECT: case SUSPECT:
Expand Down
14 changes: 10 additions & 4 deletions src/org/jgroups/stack/GossipData.java
Expand Up @@ -17,7 +17,7 @@




/** /**
* Encapsulates data sent between GossipRouter and GossipClient * Encapsulates data sent between GossipRouter and RouterStub (TCPGOSSIP and TUNNEL)
* @author Bela Ban Oct 4 2001 * @author Bela Ban Oct 4 2001
*/ */
public class GossipData implements SizeStreamable { public class GossipData implements SizeStreamable {
Expand Down Expand Up @@ -88,12 +88,11 @@ public void setPingData(List<PingData> mbrs) {
this.ping_data=mbrs; this.ping_data=mbrs;
} }


public GossipData addPingData(PingData data) { public void addPingData(PingData data) {
if(ping_data == null) if(ping_data == null)
ping_data=new ArrayList<>(); ping_data=new ArrayList<>();
if(data != null) if(data != null)
ping_data.add(data); ping_data.add(data);
return this;
} }




Expand Down Expand Up @@ -162,7 +161,13 @@ public void writeTo(DataOutput out) throws Exception {
} }


public void readFrom(DataInput in) throws Exception { public void readFrom(DataInput in) throws Exception {
type=GossipType.values()[in.readByte()]; readFrom(in, true);
}


protected void readFrom(DataInput in, boolean read_type) throws Exception {
if(read_type)
type=GossipType.values()[in.readByte()];
group=Bits.readString(in); group=Bits.readString(in);
addr=Util.readAddress(in); addr=Util.readAddress(in);
sender=Util.readAddress(in); sender=Util.readAddress(in);
Expand All @@ -189,4 +194,5 @@ public void readFrom(DataInput in) throws Exception {
} }





} }

0 comments on commit 044dcbb

Please sign in to comment.