Skip to content
Permalink
Browse files
AXIS2-4759: Applied patch submitted by Grant Patterson. Fixed Service…
…TaskManager to properly closes the connection. Also fixed some concurrency issues.
  • Loading branch information
veithen committed Jul 3, 2010
1 parent 95b3afa commit 6af743e1ff9336c3f894d9cae8252d61fe695de6
Showing 1 changed file with 39 additions and 31 deletions.
@@ -229,9 +229,9 @@ public synchronized void stop() {

if (sharedConnection != null) {
try {
sharedConnection.stop();
sharedConnection.close();
} catch (JMSException e) {
logError("Error stopping shared Connection", e);
logError("Error closing shared Connection", e);
} finally {
sharedConnection = null;
}
@@ -337,7 +337,7 @@ private class MessageListenerTask implements Runnable, ExceptionListener {
/** Is this task idle right now? */
private volatile boolean idle = false;
/** Is this task connected to the JMS provider successfully? */
private boolean connected = false;
private volatile boolean connected = false;

/** As soon as we create a new polling task, add it to the STM for control later */
MessageListenerTask() {
@@ -439,31 +439,36 @@ public void run() {
}

} finally {

if (log.isTraceEnabled()) {
log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() +
" is stopping after processing : " + messageCount + " messages :: " +
" isActive : " + isActive() + " maxMessagesPerTask : " +
getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() +
" idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " +
getIdleTaskExecutionLimit());
} else if (log.isDebugEnabled()) {
log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() +
" is stopping after processing : " + messageCount + " messages");
}

// Close the consumer and session before decrementing activeTaskCount.
// (If we have a shared connection, Qpid deadlocks if the shared connection
// is closed on another thread while closing the session)
closeConsumer(true);
closeSession(true);
closeConnection();

workerState = STATE_STOPPED;
activeTaskCount--;
synchronized(pollingTasks) {
pollingTasks.remove(this);
}

// My time is up, so if I am going away, create another
scheduleNewTaskIfAppropriate();
}

if (log.isTraceEnabled()) {
log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() +
" is stopping after processing : " + messageCount + " messages :: " +
" isActive : " + isActive() + " maxMessagesPerTask : " +
getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() +
" idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " +
getIdleTaskExecutionLimit());
} else if (log.isDebugEnabled()) {
log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() +
" is stopping after processing : " + messageCount + " messages");
}

closeConsumer(true);
closeSession(true);
closeConnection();

// My time is up, so if I am going away, create another
scheduleNewTaskIfAppropriate();
}

/**
@@ -670,20 +675,23 @@ private Connection getConnection() {
// Connection is not shared
if (connection == null) {
connection = createConnection();
setConnected(true);
}
} else {
if (sharedConnection != null) {
connection = sharedConnection;
} else {
synchronized(this) {
if (sharedConnection == null) {
sharedConnection = createConnection();
}
connection = sharedConnection;

} else if (connection == null) {
// Connection is shared, but may not have been created

synchronized(ServiceTaskManager.this) {
if (sharedConnection == null) {
sharedConnection = createConnection();
}
}
connection = sharedConnection;
setConnected(true);

}
setConnected(true);
// else: Connection is shared and is already referenced by this.connection

return connection;
}

0 comments on commit 6af743e

Please sign in to comment.