Skip to content

Commit

Permalink
Reverting last commit on ordering & failover since it didn't fix the …
Browse files Browse the repository at this point in the history
…issue
  • Loading branch information
clebertsuconic committed Dec 5, 2009
1 parent 29b310f commit 56a19df
Showing 1 changed file with 35 additions and 45 deletions.
80 changes: 35 additions & 45 deletions src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
Expand Up @@ -68,8 +68,6 @@ public class ChannelImpl implements Channel

private final Object sendBlockingLock = new Object();

private final Object replayLock = new Object();

private boolean failingOver;

private final int confWindowSize;
Expand Down Expand Up @@ -110,7 +108,7 @@ public Lock getLock()
{
return lock;
}

public int getConfirmationWindowSize()
{
return confWindowSize;
Expand Down Expand Up @@ -147,7 +145,7 @@ public void send(final Packet packet)

// This must never called by more than one thread concurrently
public void send(final Packet packet, final boolean flush)
{
{
synchronized (sendLock)
{
packet.setChannelID(id);
Expand Down Expand Up @@ -331,20 +329,15 @@ public void transferConnection(final RemotingConnection newConnection)

public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
{
// need to make sure we won't clear any packets while replaying or we could
// break order eventually
synchronized (replayLock)
if (resendCache != null)
{
if (resendCache != null)
clearUpTo(otherLastReceivedCommandID);

for (final Packet packet : resendCache)
{
clearUpTo(otherLastReceivedCommandID);

for (final Packet packet : resendCache)
{
packet.setChannelID(newChannelID);

doWrite(packet);
}
packet.setChannelID(newChannelID);

doWrite(packet);
}
}
}
Expand Down Expand Up @@ -394,7 +387,7 @@ public void confirm(final Packet packet)
{
lastReceivedCommandID++;

receivedBytes += packet.getPacketSize();
receivedBytes += packet.getPacketSize();

if (receivedBytes >= confWindowSize)
{
Expand Down Expand Up @@ -474,42 +467,39 @@ private void doWrite(final Packet packet)

private void clearUpTo(final int lastReceivedCommandID)
{
synchronized (replayLock)
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;

if (numberToClear == -1)
{
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
}

if (numberToClear == -1)
{
throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
}
int sizeToFree = 0;

int sizeToFree = 0;
for (int i = 0; i < numberToClear; i++)
{
final Packet packet = resendCache.poll();

for (int i = 0; i < numberToClear; i++)
if (packet == null)
{
final Packet packet = resendCache.poll();

if (packet == null)
{
log.warn("Can't find packet to clear: " + " last received command id " +
lastReceivedCommandID +
" first stored command id " +
firstStoredCommandID);
return;
}

if (packet.getType() != PACKETS_CONFIRMED)
{
sizeToFree += packet.getPacketSize();
}
log.warn("Can't find packet to clear: " + " last received command id " +
lastReceivedCommandID +
" first stored command id " +
firstStoredCommandID);
return;
}

if (commandConfirmationHandler != null)
{
commandConfirmationHandler.commandConfirmed(packet);
}
if (packet.getType() != PACKETS_CONFIRMED)
{
sizeToFree += packet.getPacketSize();
}

firstStoredCommandID += numberToClear;
if (commandConfirmationHandler != null)
{
commandConfirmationHandler.commandConfirmed(packet);
}
}

firstStoredCommandID += numberToClear;
}
}

0 comments on commit 56a19df

Please sign in to comment.