Permalink
Browse files

fix MultiThreadRandomFailoverTest etc

  • Loading branch information...
1 parent 031cb31 commit 02dee30b9455a4cc7cb6f04dc6717c574e2f47c2 @purplefox purplefox committed Dec 5, 2009
Showing with 111 additions and 96 deletions.
  1. +111 −96 src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
@@ -178,6 +178,8 @@
private volatile boolean workDone;
private final String groupID;
+
+ private volatile boolean inClose;
// Constructors ----------------------------------------------------------------------------
@@ -487,6 +489,10 @@ public void commit() throws HornetQException
rollbackOnFailover();
}
+ else
+ {
+ throw e;
+ }
}
workDone = false;
@@ -750,7 +756,7 @@ public void handleReceiveContinuation(final long consumerID, final SessionReceiv
public void close() throws HornetQException
{
if (closed)
- {
+ {
return;
}
@@ -760,11 +766,16 @@ public void close() throws HornetQException
closeChildren();
+ inClose = true;
+
channel.sendBlocking(new SessionCloseMessage());
}
- catch (Throwable ignore)
+ catch (Throwable e)
{
// Session close should always return without exception
+
+ //Note - we only log at trace
+ log.trace("Failed to close session", e);
}
doCleanup();
@@ -829,114 +840,120 @@ public synchronized void handleFailover(final RemotingConnection backupConnectio
else
{
// The session wasn't found on the server - probably we're failing over onto a backup server where the
- // session
- // won't exist or the target server has been restarted - in this case the session will need to be recreated,
+ // session won't exist or the target server has been restarted - in this case the session will need to be recreated,
// and we'll need to recreate any consumers
-
- Packet createRequest = new CreateSessionMessage(name,
- channel.getID(),
- version,
- username,
- password,
- minLargeMessageSize,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- confirmationWindowSize);
- boolean retry = false;
- do
+
+ // It could also be that the server hasn't been restarted, but the session is currently executing close, and that
+ // has already been executed on the server, that's why we can't find the session- in this case we *don't* want
+ // to recreate the session, we just want to unblock the blocking call
+ if (!inClose)
{
- try
- {
- channel1.sendBlocking(createRequest);
- retry = false;
- }
- catch (HornetQException e)
+ Packet createRequest = new CreateSessionMessage(name,
+ channel.getID(),
+ version,
+ username,
+ password,
+ minLargeMessageSize,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ confirmationWindowSize);
+ boolean retry = false;
+ do
{
- // the session was created while its server was starting, retry it:
- if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ try
{
- log.warn("Server is starting, retry to create the session " + name);
- retry = true;
- // sleep a little bit to avoid spinning too much
- Thread.sleep(10);
+ channel1.sendBlocking(createRequest);
+ retry = false;
}
- else
+ catch (HornetQException e)
{
- throw e;
+ // the session was created while its server was starting, retry it:
+ if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ {
+ log.warn("Server is starting, retry to create the session " + name);
+ retry = true;
+ // sleep a little bit to avoid spinning too much
+ Thread.sleep(10);
+ }
+ else
+ {
+ throw e;
+ }
}
}
- }
- while (retry);
-
- channel.clearCommands();
-
- for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
- {
- SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
- entry.getValue()
- .getQueueName(),
- entry.getValue()
- .getFilterString(),
- entry.getValue()
- .isBrowseOnly(),
- false);
-
- createConsumerRequest.setChannelID(channel.getID());
-
- Connection conn = channel.getConnection().getTransportConnection();
-
- HornetQBuffer buffer = createConsumerRequest.encode(channel.getConnection());
-
- conn.write(buffer, false);
-
- int clientWindowSize = entry.getValue().getClientWindowSize();
-
- if (clientWindowSize != 0)
+ while (retry);
+
+ channel.clearCommands();
+
+ for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
{
- SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
- clientWindowSize);
-
- packet.setChannelID(channel.getID());
-
- buffer = packet.encode(channel.getConnection());
-
+ SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
+ entry.getValue()
+ .getQueueName(),
+ entry.getValue()
+ .getFilterString(),
+ entry.getValue()
+ .isBrowseOnly(),
+ false);
+
+ createConsumerRequest.setChannelID(channel.getID());
+
+ Connection conn = channel.getConnection().getTransportConnection();
+
+ HornetQBuffer buffer = createConsumerRequest.encode(channel.getConnection());
+
conn.write(buffer, false);
+
+ int clientWindowSize = entry.getValue().getClientWindowSize();
+
+ if (clientWindowSize != 0)
+ {
+ SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
+ clientWindowSize);
+
+ packet.setChannelID(channel.getID());
+
+ buffer = packet.encode(channel.getConnection());
+
+ conn.write(buffer, false);
+ }
}
- }
-
- if ((!autoCommitAcks || !autoCommitSends) && workDone)
- {
- // Session is transacted - set for rollback only
- // FIXME - there is a race condition here - a commit could sneak in before this is set
- rollbackOnly = true;
- }
-
- // Now start the session if it was already started
- if (started)
- {
- for (ClientConsumerInternal consumer : consumers.values())
+
+ if ((!autoCommitAcks || !autoCommitSends) && workDone)
{
- consumer.clearAtFailover();
- consumer.start();
+ // Session is transacted - set for rollback only
+ // FIXME - there is a race condition here - a commit could sneak in before this is set
+ rollbackOnly = true;
}
-
- Packet packet = new PacketImpl(PacketImpl.SESS_START);
-
- packet.setChannelID(channel.getID());
-
- Connection conn = channel.getConnection().getTransportConnection();
-
- HornetQBuffer buffer = packet.encode(channel.getConnection());
-
- conn.write(buffer, false);
+
+ // Now start the session if it was already started
+ if (started)
+ {
+ for (ClientConsumerInternal consumer : consumers.values())
+ {
+ consumer.clearAtFailover();
+ consumer.start();
+ }
+
+ Packet packet = new PacketImpl(PacketImpl.SESS_START);
+
+ packet.setChannelID(channel.getID());
+
+ Connection conn = channel.getConnection().getTransportConnection();
+
+ HornetQBuffer buffer = packet.encode(channel.getConnection());
+
+ conn.write(buffer, false);
+ }
+
+ resetCreditManager = true;
}
-
- resetCreditManager = true;
-
+
channel.returnBlocking();
}
+
}
catch (Throwable t)
{
@@ -1007,8 +1024,6 @@ public void commit(final Xid xid, final boolean onePhase) throws XAException
if (rollbackOnly)
{
- rollback(xid);
-
throw new XAException(XAException.XA_RBOTHER);
}

0 comments on commit 02dee30

Please sign in to comment.