Skip to content

Commit

Permalink
- NioConnection now sends and reads cookie as well; TCP_NIO2 is now a…
Browse files Browse the repository at this point in the history
…ble to talk to TCP (https://issues.jboss.org/browse/JGRP-1952)

- Added TP to jg-protocols.xml
  • Loading branch information
belaban committed Jan 11, 2016
1 parent e1ca605 commit 6bc167f
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 57 deletions.
1 change: 1 addition & 0 deletions conf/jg-protocol-ids.xml
Expand Up @@ -68,6 +68,7 @@
<class id="72" name="org.jgroups.protocols.FD_HOST"/> <class id="72" name="org.jgroups.protocols.FD_HOST"/>
<class id="73" name="org.jgroups.protocols.ABP"/> <class id="73" name="org.jgroups.protocols.ABP"/>
<class id="74" name="org.jgroups.protocols.TCP_NIO2"/> <class id="74" name="org.jgroups.protocols.TCP_NIO2"/>
<class id="75" name="org.jgroups.protocols.TP"/>


<!-- IDs reserved for building blocks --> <!-- IDs reserved for building blocks -->
<class id="200" name="org.jgroups.blocks.RequestCorrelator"/> <!-- ID should be the same as Global.BLOCKS_START_ID --> <class id="200" name="org.jgroups.blocks.RequestCorrelator"/> <!-- ID should be the same as Global.BLOCKS_START_ID -->
Expand Down
24 changes: 14 additions & 10 deletions src/org/jgroups/blocks/cs/Connection.java
Expand Up @@ -9,14 +9,18 @@
/** /**
* Represents a connection to a peer * Represents a connection to a peer
*/ */
public interface Connection extends Closeable { public abstract class Connection implements Closeable {
boolean isOpen(); protected static final byte[] cookie= { 'b', 'e', 'l', 'a' };
boolean isConnected(); protected Address peer_addr; // address of the 'other end' of the connection
Address localAddress(); protected long last_access; // timestamp of the last access to this connection (read or write)
Address peerAddress();
boolean isExpired(long millis); abstract public boolean isOpen();
void connect(Address dest) throws Exception; abstract public boolean isConnected();
void start() throws Exception; abstract public Address localAddress();
void send(byte[] buf, int offset, int length) throws Exception; abstract public Address peerAddress();
void send(ByteBuffer buf) throws Exception; abstract public boolean isExpired(long millis);
abstract public void connect(Address dest) throws Exception;
abstract public void start() throws Exception;
abstract public void send(byte[] buf, int offset, int length) throws Exception;
abstract public void send(ByteBuffer buf) throws Exception;
} }
108 changes: 77 additions & 31 deletions src/org/jgroups/blocks/cs/NioConnection.java
Expand Up @@ -16,6 +16,7 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -25,11 +26,9 @@
* @author Bela Ban * @author Bela Ban
* @since 3.6.5 * @since 3.6.5
*/ */
public class NioConnection implements Connection { public class NioConnection extends Connection {
protected SocketChannel channel; // the channel to the peer protected SocketChannel channel; // the channel to the peer
protected SelectionKey key; protected SelectionKey key;
protected Address peer_addr; // address of the 'other end' of the connection
protected long last_access; // timestamp of the last access to this connection (read or write)
protected final NioBaseServer server; protected final NioBaseServer server;


protected final Buffers send_buf; // send messages via gathering writes protected final Buffers send_buf; // send messages via gathering writes
Expand All @@ -39,7 +38,8 @@ public class NioConnection implements Connection {
protected final Lock send_lock=new ReentrantLock(); // serialize send() protected final Lock send_lock=new ReentrantLock(); // serialize send()


// creates an array of 2: length buffer (for reading the length of the following data buffer) and data buffer // creates an array of 2: length buffer (for reading the length of the following data buffer) and data buffer
protected Buffers recv_buf=new Buffers(2).add(ByteBuffer.allocate(Global.INT_SIZE), null); // protected Buffers recv_buf=new Buffers(2).add(ByteBuffer.allocate(Global.INT_SIZE), null);
protected Buffers recv_buf=new Buffers(4).add(ByteBuffer.allocate(cookie.length));
protected Reader reader=new Reader(); // manages the thread which receives messages protected Reader reader=new Reader(); // manages the thread which receives messages
protected long reader_idle_time=20000; // number of ms a reader can be idle (no msgs) until it terminates protected long reader_idle_time=20000; // number of ms a reader can be idle (no msgs) until it terminates


Expand Down Expand Up @@ -169,10 +169,14 @@ public void send(byte[] buf, int offset, int length) throws Exception {
*/ */
@Override @Override
public void send(ByteBuffer buf) throws Exception { public void send(ByteBuffer buf) throws Exception {
send(buf, true);
}


public void send() throws Exception {
send_lock.lock(); send_lock.lock();
try { try {
// makeLengthBuffer() reuses the same pre-allocated buffer and copies it only if the write didn't complete boolean success=send_buf.write(channel);
boolean success=send_buf.add(makeLengthBuffer(buf), buf).write(channel);
writeInterest(!success); writeInterest(!success);
if(success) if(success)
updateLastAccessed(); updateLastAccessed();
Expand All @@ -188,9 +192,20 @@ public void send(ByteBuffer buf) throws Exception {
} }




public void send() throws Exception {
/** Read the length first, then the actual data. This method is not reentrant and access must be synchronized */
public void receive() throws Exception {
reader.receive();
}

protected void send(ByteBuffer buf, boolean send_length) throws Exception {
send_lock.lock(); send_lock.lock();
try { try {
// makeLengthBuffer() reuses the same pre-allocated buffer and copies it only if the write didn't complete
if(send_length)
send_buf.add(makeLengthBuffer(buf), buf);
else
send_buf.add(buf);
boolean success=send_buf.write(channel); boolean success=send_buf.write(channel);
writeInterest(!success); writeInterest(!success);
if(success) if(success)
Expand All @@ -206,25 +221,20 @@ public void send() throws Exception {
} }
} }




/** Read the length first, then the actual data. This method is not reentrant and access must be synchronized */
public void receive() throws Exception {
reader.receive();
}


protected boolean _receive(boolean update) throws Exception { protected boolean _receive(boolean update) throws Exception {
ByteBuffer msg; ByteBuffer msg;
Receiver receiver=server.receiver(); Receiver receiver=server.receiver();


if((msg=recv_buf.readLengthAndData(channel)) == null)
return false;
if(peer_addr == null && server.usePeerConnections()) { if(peer_addr == null && server.usePeerConnections()) {
peer_addr=readPeerAddress(msg); if((peer_addr=readPeerAddress()) != null) {
server.addConnection(peer_addr, this); recv_buf=new Buffers(2).add(ByteBuffer.allocate(Global.INT_SIZE), null);
return true; server.addConnection(peer_addr, this);
return true;
}
} }

if((msg=recv_buf.readLengthAndData(channel)) == null)
return false;
if(receiver != null) if(receiver != null)
receiver.receive(peer_addr, msg); receiver.receive(peer_addr, msg);
if(update) if(update)
Expand Down Expand Up @@ -311,10 +321,12 @@ protected void setSocketParameters(Socket client_sock) throws SocketException {
protected void sendLocalAddress(Address local_addr) throws Exception { protected void sendLocalAddress(Address local_addr) throws Exception {
try { try {
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(); ByteArrayDataOutputStream out=new ByteArrayDataOutputStream();
out.write(cookie, 0, cookie.length);
out.writeShort(Version.version); out.writeShort(Version.version);
out.writeShort(local_addr.size()); // address size
local_addr.writeTo(out); local_addr.writeTo(out);
ByteBuffer buf=out.getByteBuffer(); ByteBuffer buf=out.getByteBuffer();
send(buf); send(buf, false);
updateLastAccessed(); updateLastAccessed();
} }
catch(Exception ex) { catch(Exception ex) {
Expand All @@ -323,18 +335,52 @@ protected void sendLocalAddress(Address local_addr) throws Exception {
} }
} }


protected Address readPeerAddress(ByteBuffer buf) throws Exception { protected Address readPeerAddress() throws Exception {
ByteArrayDataInputStream in=new ByteArrayDataInputStream(buf); while(recv_buf.read(channel)) {
short version=in.readShort(); // version int current_position=recv_buf.position()-1;
if(!Version.isBinaryCompatible(version)) ByteBuffer buf=recv_buf.get(current_position);
throw new IOException("packet from " + channel.getRemoteAddress() + " has different version (" + Version.print(version) + if(buf == null)
") from ours (" + Version.printVersion() + "); discarding it"); return null;
Address client_peer_addr=new IpAddress(); buf.flip();
client_peer_addr.readFrom(in); switch(current_position) {
updateLastAccessed(); case 0: // cookie
return client_peer_addr; byte[] cookie_buf=getBuffer(buf);
if(!Arrays.equals(cookie, cookie_buf))
throw new IllegalStateException("BaseServer.NioConnection.readPeerAddress(): cookie read by "
+ server.localAddress() + " does not match own cookie; terminating connection");
recv_buf.add(ByteBuffer.allocate(Global.SHORT_SIZE));
break;
case 1: // version
short version=buf.getShort();
if(!Version.isBinaryCompatible(version))
throw new IOException("packet from " + channel.getRemoteAddress() + " has different version (" + Version.print(version) +
") from ours (" + Version.printVersion() + "); discarding it");
recv_buf.add(ByteBuffer.allocate(Global.SHORT_SIZE));
break;
case 2: // length of address
short addr_len=buf.getShort();
recv_buf.add(ByteBuffer.allocate(addr_len));
break;
case 3: // address
byte[] addr_buf=getBuffer(buf);
ByteArrayDataInputStream in=new ByteArrayDataInputStream(addr_buf);
IpAddress addr=new IpAddress();
addr.readFrom(in);
return addr;
default:
throw new IllegalStateException(String.format("position %d is invalid", recv_buf.position()));
}
}
return null;
}

protected static byte[] getBuffer(final ByteBuffer buf) {
byte[] retval=new byte[buf.limit()];
buf.get(retval, buf.position(), buf.limit());
return retval;
} }



protected static ByteBuffer makeLengthBuffer(ByteBuffer buf) { protected static ByteBuffer makeLengthBuffer(ByteBuffer buf) {
return (ByteBuffer)ByteBuffer.allocate(Global.INT_SIZE).putInt(buf.remaining()).clear(); return (ByteBuffer)ByteBuffer.allocate(Global.INT_SIZE).putInt(buf.remaining()).clear();
} }
Expand Down
17 changes: 6 additions & 11 deletions src/org/jgroups/blocks/cs/TcpConnection.java
Expand Up @@ -10,6 +10,7 @@
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand All @@ -21,15 +22,12 @@
* @author Bela Ban * @author Bela Ban
* @since 3.6.5 * @since 3.6.5
*/ */
public class TcpConnection implements Connection { public class TcpConnection extends Connection {
protected final Socket sock; // socket to/from peer (result of srv_sock.accept() or new Socket()) protected final Socket sock; // socket to/from peer (result of srv_sock.accept() or new Socket())
protected final ReentrantLock send_lock=new ReentrantLock(); // serialize send() protected final ReentrantLock send_lock=new ReentrantLock(); // serialize send()
protected static final byte[] cookie= { 'b', 'e', 'l', 'a' };
protected static final Buffer termination=new Buffer(cookie); protected static final Buffer termination=new Buffer(cookie);
protected DataOutputStream out; protected DataOutputStream out;
protected DataInputStream in; protected DataInputStream in;
protected Address peer_addr; // address of the 'other end' of the connection
protected long last_access;
protected volatile Sender sender; protected volatile Sender sender;
protected volatile Receiver receiver; protected volatile Receiver receiver;
protected final TcpBaseServer server; protected final TcpBaseServer server;
Expand Down Expand Up @@ -231,6 +229,7 @@ protected void sendLocalAddress(Address local_addr) throws Exception {


// write the version // write the version
out.writeShort(Version.version); out.writeShort(Version.version);
out.writeShort(local_addr.size()); // address size
local_addr.writeTo(out); local_addr.writeTo(out);
out.flush(); // needed ? out.flush(); // needed ?
updateLastAccessed(); updateLastAccessed();
Expand All @@ -253,7 +252,7 @@ protected Address readPeerAddress(Socket client_sock) throws Exception {
// read the cookie first // read the cookie first
byte[] input_cookie=new byte[cookie.length]; byte[] input_cookie=new byte[cookie.length];
in.readFully(input_cookie, 0, input_cookie.length); in.readFully(input_cookie, 0, input_cookie.length);
if(!matchCookie(input_cookie)) if(!Arrays.equals(cookie, input_cookie))
throw new SocketException("BaseServer.TcpConnection.readPeerAddress(): cookie read by " throw new SocketException("BaseServer.TcpConnection.readPeerAddress(): cookie read by "
+ server.localAddress() + " does not match own cookie; terminating connection"); + server.localAddress() + " does not match own cookie; terminating connection");
// then read the version // then read the version
Expand All @@ -262,6 +261,8 @@ protected Address readPeerAddress(Socket client_sock) throws Exception {
throw new IOException("packet from " + client_sock.getInetAddress() + ":" + client_sock.getPort() + throw new IOException("packet from " + client_sock.getInetAddress() + ":" + client_sock.getPort() +
" has different version (" + Version.print(version) + " has different version (" + Version.print(version) +
") from ours (" + Version.printVersion() + "); discarding it"); ") from ours (" + Version.printVersion() + "); discarding it");
short addr_len=in.readShort(); // only needed by NioConnection

Address client_peer_addr=new IpAddress(); Address client_peer_addr=new IpAddress();
client_peer_addr.readFrom(in); client_peer_addr.readFrom(in);
updateLastAccessed(); updateLastAccessed();
Expand All @@ -273,12 +274,6 @@ protected Address readPeerAddress(Socket client_sock) throws Exception {
} }




protected static boolean matchCookie(byte[] input) {
if(input == null || input.length < cookie.length) return false;
for(int i=0; i < cookie.length; i++)
if(cookie[i] != input[i]) return false;
return true;
}


protected class Receiver implements Runnable { protected class Receiver implements Runnable {
protected final Thread recv; protected final Thread recv;
Expand Down
4 changes: 1 addition & 3 deletions src/org/jgroups/conf/ClassConfigurator.java
Expand Up @@ -192,9 +192,7 @@ public static short getMagicNumber(Class clazz) {


public static short getProtocolId(Class protocol) { public static short getProtocolId(Class protocol) {
Short retval=protocol_ids.get(protocol); Short retval=protocol_ids.get(protocol);
if(retval != null) return retval != null? retval : 0;
return retval;
return 0;
} }




Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/nio/Buffers.java
Expand Up @@ -153,7 +153,7 @@ public ByteBuffer readLengthAndData(SocketChannel ch) throws Exception {
* {@link ByteBuffer#clear()} has to be called (all buffers have their position == limit on a successfull read). * {@link ByteBuffer#clear()} has to be called (all buffers have their position == limit on a successfull read).
*/ */
public boolean read(SocketChannel ch) throws Exception { public boolean read(SocketChannel ch) throws Exception {
long bytes=ch.read(bufs, position, limit); long bytes=ch.read(bufs, position, limit-position);
if(bytes == -1) if(bytes == -1)
throw new EOFException(); throw new EOFException();
return adjustPosition(false); return adjustPosition(false);
Expand Down
2 changes: 2 additions & 0 deletions src/org/jgroups/nio/MockSocketChannel.java
Expand Up @@ -48,6 +48,8 @@ public MockSocketChannel bytesToRead(ByteBuffer buf) {
return this; return this;
} }


public ByteBuffer bytesToRead() {return bytes_to_read;}

public MockSocketChannel recorder(ByteBuffer buf) {this.recorder=buf; return this;} public MockSocketChannel recorder(ByteBuffer buf) {this.recorder=buf; return this;}
public ByteBuffer recorder() {return recorder;} public ByteBuffer recorder() {return recorder;}


Expand Down
4 changes: 3 additions & 1 deletion src/org/jgroups/protocols/TP.java
Expand Up @@ -4,6 +4,7 @@
import org.jgroups.*; import org.jgroups.*;
import org.jgroups.annotations.*; import org.jgroups.annotations.*;
import org.jgroups.blocks.LazyRemovalCache; import org.jgroups.blocks.LazyRemovalCache;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.conf.PropertyConverters; import org.jgroups.conf.PropertyConverters;
import org.jgroups.logging.LogFactory; import org.jgroups.logging.LogFactory;
import org.jgroups.stack.DiagnosticsHandler; import org.jgroups.stack.DiagnosticsHandler;
Expand Down Expand Up @@ -1085,7 +1086,8 @@ public void evictLogicalAddressCache(boolean force) {




public void init() throws Exception { public void init() throws Exception {
super.init(); this.id=ClassConfigurator.getProtocolId(TP.class);
// super.init();


// Create the default thread factory // Create the default thread factory
if(global_thread_factory == null) if(global_thread_factory == null)
Expand Down

0 comments on commit 6bc167f

Please sign in to comment.