Permalink
Browse files

Merge pull request #46 from FranciscoBorges/master

1 dead lock fixed, 1 thread leak fixed, finally cleaned all concurrency control in ClientSessionFactory
  • Loading branch information...
2 parents eec90bb + 7c5b864 commit 3bffc58089687934e1c1bb57ef6bc8dc95c440f0 @clebertsuconic clebertsuconic committed Apr 11, 2012
Showing with 551 additions and 670 deletions.
  1. +1 −0 .gitignore
  2. +1 −29 hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
  3. +32 −40 hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
  4. +16 −87 hornetq-core/src/main/java/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java
  5. +6 −0 hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
  6. +3 −2 hornetq-core/src/main/java/org/hornetq/core/message/impl/MessageImpl.java
  7. +1 −0 hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java
  8. +0 −2 hornetq-core/src/main/java/org/hornetq/core/postoffice/PostOffice.java
  9. +47 −18 hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
  10. +79 −42 hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateSessionMessage.java
  11. +33 −9 ...q-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
  12. +27 −16 ...main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
  13. +33 −19 ...e/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionProducerCreditsMessage.java
  14. +25 −19 ...n/java/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
  15. +9 −18 hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
  16. +11 −3 hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/HttpAcceptorHandler.java
  17. +1 −1 hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/HttpKeepAliveRunnable.java
  18. +4 −19 hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java
  19. +6 −1 hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
  20. +0 −3 hornetq-core/src/main/java/org/hornetq/core/settings/impl/AddressSettings.java
  21. +3 −10 hornetq-jms/src/main/java/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
  22. +0 −3 hornetq-jms/src/main/java/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
  23. +5 −2 hornetq-journal/src/main/java/org/hornetq/core/journal/impl/SyncSpeedTest.java
  24. +1 −0 pom.xml
  25. +15 −10 tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/CrashClient2.java
  26. +2 −1 ...egration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
  27. +58 −68 tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/StoreConfigTest.java
  28. +40 −53 ...s/integration-tests/src/test/java/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java
  29. +21 −13 ...egration-tests/src/test/java/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java
  30. +1 −11 ...rc/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
  31. +40 −56 ...ests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
  32. +7 −35 ...c/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
  33. +2 −4 tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/AIOJournalImplTest.java
  34. +0 −10 ...src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
  35. +0 −14 ...ation-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
  36. +3 −4 tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQActivationTest.java
  37. +6 −6 tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/ResourceAdapterTest.java
  38. +1 −1 tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java
  39. +1 −19 tests/jms-tests/src/test/java/org/hornetq/jms/tests/SecurityTest.java
  40. +5 −8 tests/jms-tests/src/test/java/org/hornetq/jms/tests/tools/WrappedJNDIServer.java
  41. +2 −0 tests/timing-tests/src/test/java/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
  42. +3 −14 ...tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
View
@@ -670,3 +670,4 @@ tests/**/server.lock
/docs/quickstart-guide/.project
native/autom4te.cache
+.settings/org.eclipse.core.resources.prefs
@@ -29,22 +29,14 @@
* until the buffer is filled up or the user set a streaming.
* @author clebertsuconic
*/
-public class ClientLargeMessageImpl extends ClientMessageImpl implements ClientLargeMessageInternal
+public final class ClientLargeMessageImpl extends ClientMessageImpl implements ClientLargeMessageInternal
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
// Used only when receiving large messages
private LargeMessageController largeMessageController;
private long largeMessageSize;
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
/**
* @return the largeMessageSize
*/
@@ -115,9 +107,6 @@ public LargeMessageController getLargeMessageController()
return largeMessageController;
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
- */
@Override
public void saveToOutputStream(final OutputStream out) throws HornetQException
{
@@ -132,9 +121,6 @@ public void saveToOutputStream(final OutputStream out) throws HornetQException
}
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ClientMessage#setOutputStream(java.io.OutputStream)
- */
@Override
public void setOutputStream(final OutputStream out) throws HornetQException
{
@@ -148,9 +134,6 @@ public void setOutputStream(final OutputStream out) throws HornetQException
}
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ClientMessage#waitOutputStreamCompletion()
- */
@Override
public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws HornetQException
{
@@ -177,12 +160,6 @@ public void discardBody()
}
}
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
private void checkBuffer()
{
if (bodyBuffer == null)
@@ -219,15 +196,10 @@ private void checkBuffer()
this.bufferOut = out;
}
- /* (non-Javadoc)
- * @see java.io.OutputStream#write(int)
- */
@Override
public void write(int b) throws IOException
{
bufferOut.writeByte((byte)(b & 0xff));
}
-
}
-
}
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -112,11 +113,6 @@
private final Set<ClientSessionInternal> sessions = new HashSet<ClientSessionInternal>();
- private final Object exitLock = new Object();
-
-
- private boolean inCreateSession;
-
private final Object createSessionLock = new Object();
private final Object failoverLock = new Object();
private final Object connectionLock = new Object();
@@ -146,7 +142,14 @@
private Future<?> pingerFuture;
private PingRunnable pingRunnable;
+ /** Flag that signals that the factory is closing. Causes many processes to exit. */
private volatile boolean exitLoop;
+ /** Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatche} */
+ private final Object inCreateSessionGuard = new Object();
+ /** Flag that tells whether we are trying to create a session. */
+ private boolean inCreateSession;
+ /** Used to wait for the creation of a session. */
+ private CountDownLatch inCreateSessionLatch;
private final List<Interceptor> interceptors;
@@ -156,7 +159,7 @@
public final Exception e = new Exception();
- private final Object waitLock = new Object();
+ private final CountDownLatch waitLatch = new CountDownLatch(1);
public final static Set<CloseRunnable> CLOSE_RUNNABLES = Collections.synchronizedSet(new HashSet<CloseRunnable>());
@@ -437,10 +440,7 @@ public boolean removeFailureListener(final SessionFailureListener listener)
public void causeExit()
{
exitLoop = true;
- synchronized (waitLock)
- {
- waitLock.notifyAll();
- }
+ waitLatch.countDown();
}
public void close()
@@ -451,9 +451,10 @@ public void close()
}
exitLoop = true;
- synchronized (exitLock)
+ synchronized (inCreateSessionGuard)
{
- exitLock.notifyAll();
+ if (inCreateSessionLatch != null)
+ inCreateSessionLatch.countDown();
}
forceReturnChannel1();
@@ -618,9 +619,11 @@ private void failoverOrReconnect(final Object connectionID, final HornetQExcepti
final boolean needToInterrupt;
- synchronized (exitLock)
+ CountDownLatch exitLockLatch;
+ synchronized (inCreateSessionGuard)
{
needToInterrupt = inCreateSession;
+ exitLockLatch = inCreateSessionLatch;
}
unlockChannel1();
@@ -634,17 +637,14 @@ private void failoverOrReconnect(final Object connectionID, final HornetQExcepti
// Now we need to make sure that the thread has actually exited and returned it's connections
// before failover occurs
- synchronized (exitLock)
+ while (inCreateSession && !exitLoop)
{
- while (inCreateSession && !exitLoop)
+ try
+ {
+ exitLockLatch.await(500, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
{
- try
- {
- exitLock.wait(5000);
- }
- catch (InterruptedException e)
- {
- }
}
}
}
@@ -766,9 +766,13 @@ private ClientSession createSessionInternal(final String username,
} // We can now release the failoverLock
// We now set a flag saying createSession is executing
- synchronized (exitLock)
+ synchronized (inCreateSessionGuard)
{
+ if (exitLoop)
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
+ "ClientSession closed while creating session");
inCreateSession = true;
+ inCreateSessionLatch = new CountDownLatch(1);
}
long sessionChannelID = connection.generateChannelID();
@@ -866,7 +870,6 @@ private ClientSession createSessionInternal(final String username,
if (lock != null)
{
lock.unlock();
-
lock = null;
}
@@ -876,11 +879,7 @@ private ClientSession createSessionInternal(final String username,
}
else
{
- HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR,
- "Failed to create session",
- t);
-
- throw me;
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to create session", t);
}
}
finally
@@ -891,12 +890,8 @@ private ClientSession createSessionInternal(final String username,
}
// Execution has finished so notify any failover thread that may be waiting for us to be done
- synchronized (exitLock)
- {
- inCreateSession = false;
-
- exitLock.notify();
- }
+ inCreateSession = false;
+ inCreateSessionLatch.countDown();
}
}
while (retry);
@@ -1000,9 +995,7 @@ private void getConnectionWithRetry(final int reconnectAttempts)
int count = 0;
- synchronized (waitLock)
- {
- while (!exitLoop)
+ while (!exitLoop)
{
if (ClientSessionFactoryImpl.isDebug)
{
@@ -1045,7 +1038,7 @@ else if (reconnectAttempts == 1)
try
{
- waitLock.wait(interval);
+ waitLatch.await(interval, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ignore)
{
@@ -1076,7 +1069,6 @@ else if (reconnectAttempts == 1)
return;
}
}
- }
}
private void cancelScheduledTasks()
Oops, something went wrong.

0 comments on commit 3bffc58

Please sign in to comment.