Skip to content

Commit

Permalink
backport of http://jira.jboss.com/jira/browse/JGRP-637 from CVS head …
Browse files Browse the repository at this point in the history
…(persistent port numbers)
  • Loading branch information
belaban committed Nov 28, 2007
1 parent f78a833 commit 6202a3f
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 39 deletions.
8 changes: 8 additions & 0 deletions src/org/jgroups/blocks/BasicConnectionTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.jgroups.Global;
import org.jgroups.Version;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.PortsManager;
import org.jgroups.util.ThreadFactory;
import org.jgroups.util.Util;

Expand All @@ -25,6 +26,7 @@
/**
* Shared class for TCP connection tables.
* @author Scott Marlow
* @author Bela Ban
*/
public abstract class BasicConnectionTable {
private ThreadFactory factory = new ConnectionTableFactory();
Expand Down Expand Up @@ -52,6 +54,8 @@ public abstract class BasicConnectionTable {
boolean tcp_nodelay=false;
int linger=-1;

protected PortsManager pm=null;

/**
* The address which will be broadcast to the group (the externally visible address which this host should
* be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to.
Expand Down Expand Up @@ -175,6 +179,10 @@ public void stop() {
// 2. close the server socket (this also stops the acceptor thread)
if(srv_sock != null) {
try {
if(pm != null) {
int tmp_port=srv_sock.getLocalPort();
pm.removePort(tmp_port);
}
ServerSocket tmp=srv_sock;
srv_sock=null;
tmp.close();
Expand Down
41 changes: 36 additions & 5 deletions src/org/jgroups/blocks/ConnectionTable.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// $Id: ConnectionTable.java,v 1.62.2.1 2007/11/26 21:16:46 vlada Exp $
// $Id: ConnectionTable.java,v 1.62.2.2 2007/11/28 10:58:23 belaban Exp $

package org.jgroups.blocks;

import org.jgroups.Address;
import org.jgroups.util.PortsManager;
import org.jgroups.stack.IpAddress;

import java.io.IOException;
Expand Down Expand Up @@ -57,6 +58,17 @@ public ConnectionTable(int srv_port, long reaper_interval, long conn_expire_time
}


public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr,
int srv_port, int max_port) throws Exception {
setReceiver(r);
this.bind_addr=bind_addr;
this.external_addr=external_addr;
this.srv_port=srv_port;
this.max_port=max_port;
init();
}


/**
* Create a ConnectionTable
* @param r A reference to a receiver of all messages received by this class. Method <code>receive()</code>
Expand All @@ -73,16 +85,31 @@ public ConnectionTable(int srv_port, long reaper_interval, long conn_expire_time
* @param max_port The largest port number that the server socket will be bound to. If max_port < srv_port
* then there is no limit.
*/
public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port) throws Exception {
public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr,
int srv_port, int max_port, PortsManager pm) throws Exception {
setReceiver(r);
this.bind_addr=bind_addr;
this.external_addr=external_addr;
this.srv_port=srv_port;
this.max_port=max_port;
this.max_port=max_port;
this.pm=pm;
init();
}


public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
long reaper_interval, long conn_expire_time) throws Exception {
setReceiver(r);
this.bind_addr=bind_addr;
this.external_addr=external_addr;
this.srv_port=srv_port;
this.max_port=max_port;
this.reaper_interval=reaper_interval;
this.conn_expire_time=conn_expire_time;
use_reaper=true;
init();
}

/**
* ConnectionTable including a connection reaper. Connections that have been idle for more than conn_expire_time
* milliseconds will be closed and removed from the connection table. On next access they will be re-created.
Expand All @@ -104,12 +131,13 @@ public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_a
* it will be reaped
*/
public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
long reaper_interval, long conn_expire_time) throws Exception {
long reaper_interval, long conn_expire_time, PortsManager pm) throws Exception {
setReceiver(r);
this.bind_addr=bind_addr;
this.external_addr=external_addr;
this.srv_port=srv_port;
this.max_port=max_port;
this.pm=pm;
this.reaper_interval=reaper_interval;
this.conn_expire_time=conn_expire_time;
use_reaper=true;
Expand Down Expand Up @@ -178,7 +206,8 @@ public final void start() throws Exception {
super.start();
}

protected void init() throws Exception {
protected void init() throws Exception {

srv_sock=createServerSocket(srv_port, max_port);

if (external_addr!=null)
Expand Down Expand Up @@ -309,6 +338,8 @@ protected ServerSocket createServerSocket(int start_port, int end_port) throws E

while(true) {
try {
if(start_port > 0 && pm != null)
start_port=pm.getNextAvailablePort(start_port);
if(bind_addr == null)
ret=new ServerSocket(start_port);
else {
Expand Down
76 changes: 59 additions & 17 deletions src/org/jgroups/blocks/ConnectionTableNIO.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// $Id: ConnectionTableNIO.java,v 1.37 2007/11/06 17:13:50 vlada Exp $
// $Id: ConnectionTableNIO.java,v 1.37.2.1 2007/11/28 10:58:22 belaban Exp $

package org.jgroups.blocks;

import org.apache.commons.logging.Log;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.DirectExecutor;
import org.jgroups.util.PortsManager;

import java.io.IOException;
import java.net.*;
Expand Down Expand Up @@ -95,30 +95,44 @@ public ConnectionTableNIO(int srv_port, long reaper_interval,
* @param max_port
* @throws Exception
*/
public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port
)
throws Exception
public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port)
throws Exception
{
setReceiver(r);
this.external_addr=external_addr;
this.bind_addr=bind_addr;
this.srv_port=srv_port;
this.max_port=max_port;
use_reaper=true;
start();
setReceiver(r);
this.external_addr=external_addr;
this.bind_addr=bind_addr;
this.srv_port=srv_port;
this.max_port=max_port;
use_reaper=true;
start();
}


public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
boolean doStart
)
public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr,
int srv_port, int max_port, boolean doStart)
throws Exception
{
setReceiver(r);
this.external_addr=external_addr;
this.bind_addr=bind_addr;
this.srv_port=srv_port;
this.max_port=max_port;
use_reaper=true;
if(doStart)
start();
}

public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr,
int srv_port, int max_port, PortsManager pm,
boolean doStart)
throws Exception
{
setReceiver(r);
this.external_addr=external_addr;
this.bind_addr=bind_addr;
this.srv_port=srv_port;
this.max_port=max_port;
this.pm=pm;
use_reaper=true;
if(doStart)
start();
Expand Down Expand Up @@ -151,7 +165,8 @@ public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress externa
}


public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr,
int srv_port, int max_port,
long reaper_interval, long conn_expire_time, boolean doStart
) throws Exception
{
Expand All @@ -167,6 +182,24 @@ public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress externa
start();
}

public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr,
int srv_port, int max_port, PortsManager pm,
long reaper_interval, long conn_expire_time, boolean doStart
) throws Exception
{
setReceiver(r);
this.bind_addr=bind_addr;
this.external_addr=external_addr;
this.srv_port=srv_port;
this.max_port=max_port;
this.pm=pm;
this.reaper_interval=reaper_interval;
this.conn_expire_time=conn_expire_time;
use_reaper=true;
if(doStart)
start();
}



public int getReaderThreads() { return m_reader_threads; }
Expand Down Expand Up @@ -332,7 +365,12 @@ protected void init()

// use directExector if max thread pool size is less than or equal to zero.
if(getProcessorMaxThreads() <= 0) {
m_requestProcessors = new DirectExecutor();
m_requestProcessors = new Executor() {

public void execute(Runnable command) {
command.run();
}
};
}
else
{
Expand Down Expand Up @@ -585,6 +623,10 @@ protected ServerSocket createServerSocket(int start_port, int end_port) throws E
this.m_acceptSelector = Selector.open();
m_serverSocketChannel = ServerSocketChannel.open();
m_serverSocketChannel.configureBlocking(false);

if(start_port > 0 && pm != null)
start_port=pm.getNextAvailablePort(start_port);

while (true)
{
try
Expand Down
12 changes: 7 additions & 5 deletions src/org/jgroups/protocols/TCP.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// $Id: TCP.java,v 1.44.2.1 2007/11/26 21:16:45 vlada Exp $
// $Id: TCP.java,v 1.44.2.2 2007/11/28 10:58:21 belaban Exp $

package org.jgroups.protocols;


import org.jgroups.Address;
import org.jgroups.blocks.ConnectionTable;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.PortsManager;

import java.net.InetAddress;
import java.util.Collection;
Expand Down Expand Up @@ -59,7 +60,7 @@ public void retainAll(Collection<Address> members) {
}

public void start() throws Exception {
ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port);
ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port,pm);
// ct.addConnectionListener(this);
ct.setUseSendQueues(use_send_queues);
ct.setSendQueueSize(send_queue_size);
Expand Down Expand Up @@ -101,10 +102,11 @@ public void stop() {
* ConnectionTable.
*/
protected ConnectionTable getConnectionTable(long reaperInterval, long connExpireTime, InetAddress bindAddress,
InetAddress externalAddress, int startPort, int endPort) throws Exception {
InetAddress externalAddress, int startPort, int endPort,
PortsManager pm) throws Exception {
ConnectionTable cTable;
if(reaperInterval == 0 && connExpireTime == 0) {
cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort);
cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort, pm);
}
else {
if(reaperInterval == 0) {
Expand All @@ -116,7 +118,7 @@ protected ConnectionTable getConnectionTable(long reaperInterval, long connExpir
if(log.isWarnEnabled()) log.warn("conn_expire_time was 0, set it to " + connExpireTime);
}
cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort,
reaperInterval, connExpireTime);
reaperInterval, connExpireTime, pm);
}
cTable.setThreadFactory(getProtocolStack().getThreadFactory());
return cTable;
Expand Down
12 changes: 7 additions & 5 deletions src/org/jgroups/protocols/TCP_NIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.jgroups.blocks.BasicConnectionTable;
import org.jgroups.Address;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.PortsManager;

import java.net.InetAddress;
import java.util.Properties;
Expand All @@ -14,7 +15,7 @@
* @author Scott Marlow
* @author Alex Fu
* @author Bela Ban
* @version $Id: TCP_NIO.java,v 1.17 2007/11/06 17:13:51 vlada Exp $
* @version $Id: TCP_NIO.java,v 1.17.2.1 2007/11/28 10:58:21 belaban Exp $
*/
public class TCP_NIO extends BasicTCP implements BasicConnectionTable.Receiver
{
Expand All @@ -25,10 +26,11 @@ public class TCP_NIO extends BasicTCP implements BasicConnectionTable.Receiver
* @see org.jgroups.protocols.TCP#getConnectionTable(long, long)
*/
protected ConnectionTableNIO getConnectionTable(long ri, long cet,
InetAddress b_addr, InetAddress bc_addr, int s_port, int e_port) throws Exception {
InetAddress b_addr, InetAddress bc_addr,
int s_port, int e_port, PortsManager pm) throws Exception {
ConnectionTableNIO retval=null;
if (ri == 0 && cet == 0) {
retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, false );
retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, pm, false );
}
else {
if (ri == 0) {
Expand All @@ -39,7 +41,7 @@ protected ConnectionTableNIO getConnectionTable(long ri, long cet,
cet = 1000 * 60 * 5;
if(log.isWarnEnabled()) log.warn("conn_expire_time was 0, set it to " + cet);
}
retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, ri, cet, false);
retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, pm, ri, cet, false);
}
retval.setThreadFactory(getProtocolStack().getThreadFactory());
retval.setProcessorMaxThreads(getProcessorMaxThreads());
Expand All @@ -58,7 +60,7 @@ public void send(Address dest, byte[] data, int offset, int length) throws Excep
}

public void start() throws Exception {
ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port);
ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port,pm);
ct.setUseSendQueues(use_send_queues);
// ct.addConnectionListener(this);
ct.setReceiveBufferSize(recv_buf_size);
Expand Down
Loading

0 comments on commit 6202a3f

Please sign in to comment.