Skip to content

Commit

Permalink
if sender/receiver threads die connection gets cleaned up
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Blagojevic committed Jul 3, 2010
1 parent 6fb02fd commit 2d79ae9
Showing 1 changed file with 65 additions and 37 deletions.
102 changes: 65 additions & 37 deletions src/org/jgroups/blocks/TCPConnectionMap.java
Expand Up @@ -554,27 +554,38 @@ public void stop() {
recv.interrupt();
}
}

public boolean isRunning() {
return receiving.get();
}

public boolean canRun() {
return isRunning() && isConnected();
}

public void run() {

while(!Thread.currentThread().isInterrupted() && isOpen()) {
try {
int len=in.readInt();
byte[] buf=new byte[len];
in.readFully(buf, 0, len);
updateLastAccessed();
receiver.receive(peer_addr, buf, 0, len);
}
catch(OutOfMemoryError mem_ex) {
break; // continue;
}
catch(IOException io_ex) {
break;
}
catch(Throwable e) {
try {
while(!Thread.currentThread().isInterrupted() && canRun()) {
try {
int len=in.readInt();
byte[] buf=new byte[len];
in.readFully(buf, 0, len);
updateLastAccessed();
receiver.receive(peer_addr, buf, 0, len);
}
catch(OutOfMemoryError mem_ex) {
break; // continue;
}
catch(IOException io_ex) {
break;
}
catch(Throwable e) {
}
}
}
Util.close(TCPConnection.this);
finally{
Util.close(TCPConnection.this);
}
}
}

Expand All @@ -590,10 +601,12 @@ public Sender(ThreadFactory tf,int send_queue_size) {
}

public void addToQueue(byte[] data) throws Exception{
if(isOpenAndRunning())
if(canRun())
send_queue.add(data);
else
throw new Exception("Sender running=" + sending.get() +", underlying connection open= " + isOpen());
throw new Exception("Queue closed since sender thread has died, running="
+ sending.get() + ", underlying connection open= "
+ isConnected());
}

public void start() {
Expand All @@ -608,31 +621,40 @@ public void stop() {
}
}

public boolean isOpenAndRunning() {
return sending.get() && isOpen();
public boolean isRunning() {
return sending.get();
}

public boolean canRun() {
return isRunning() && isConnected();
}

public void run() {
while(isOpenAndRunning()) {
byte[] data=null;
try {
data=send_queue.take();
}
catch(InterruptedException e) {
// Thread.currentThread().interrupt();
break;
}

if(data != null) {
try {
while(!Thread.currentThread().isInterrupted() && canRun()) {
byte[] data=null;
try {
_send(data, 0, data.length, false);
data=send_queue.take();
}
catch(Throwable ignored) {
catch(InterruptedException e) {
// Thread.currentThread().interrupt();
break;
}
}

if(data != null) {
try {
_send(data, 0, data.length, false);
}
catch(Throwable ignored) {
}
}
}
}
finally {
Util.close(TCPConnection.this);
}
if(log.isTraceEnabled())
log.trace("ConnectionTable.Connection.Sender thread terminated");
log.trace("TCPConnection.Sender thread terminated at " + local_addr);
}
}

Expand Down Expand Up @@ -670,9 +692,15 @@ public String toString() {
public boolean isExpired(long now) {
return getConnectionExpiryTimeout() > 0 && now - last_access >= getConnectionExpiryTimeout();
}

public boolean isConnected() {
return !sock.isClosed() && sock.isConnected();
}

public boolean isOpen() {
return !sock.isClosed() && sock.isConnected();
return isConnected()
&& (isSenderUsed() ? sender.isRunning() : true)
&& (connectionPeerReceiver != null && connectionPeerReceiver.isRunning());
}

public void close() throws IOException {
Expand Down

0 comments on commit 2d79ae9

Please sign in to comment.