From 56a19dfc53c53186ce3d17c420ec47ec3766e2a3 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Sat, 5 Dec 2009 06:32:26 +0000 Subject: [PATCH] Reverting last commit on ordering & failover since it didn't fix the issue --- .../core/remoting/impl/ChannelImpl.java | 80 ++++++++----------- 1 file changed, 35 insertions(+), 45 deletions(-) diff --git a/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java b/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java index c8cafb12781..08b2b6c6c29 100644 --- a/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java +++ b/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java @@ -68,8 +68,6 @@ public class ChannelImpl implements Channel private final Object sendBlockingLock = new Object(); - private final Object replayLock = new Object(); - private boolean failingOver; private final int confWindowSize; @@ -110,7 +108,7 @@ public Lock getLock() { return lock; } - + public int getConfirmationWindowSize() { return confWindowSize; @@ -147,7 +145,7 @@ public void send(final Packet packet) // This must never called by more than one thread concurrently public void send(final Packet packet, final boolean flush) - { + { synchronized (sendLock) { packet.setChannelID(id); @@ -331,20 +329,15 @@ public void transferConnection(final RemotingConnection newConnection) public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID) { - // need to make sure we won't clear any packets while replaying or we could - // break order eventually - synchronized (replayLock) + if (resendCache != null) { - if (resendCache != null) + clearUpTo(otherLastReceivedCommandID); + + for (final Packet packet : resendCache) { - clearUpTo(otherLastReceivedCommandID); - - for (final Packet packet : resendCache) - { - packet.setChannelID(newChannelID); - - doWrite(packet); - } + packet.setChannelID(newChannelID); + + doWrite(packet); } } } @@ -394,7 +387,7 @@ public void confirm(final Packet packet) { lastReceivedCommandID++; - receivedBytes += packet.getPacketSize(); + receivedBytes += packet.getPacketSize(); if (receivedBytes >= confWindowSize) { @@ -474,42 +467,39 @@ private void doWrite(final Packet packet) private void clearUpTo(final int lastReceivedCommandID) { - synchronized (replayLock) + final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID; + + if (numberToClear == -1) { - final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID; + throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID); + } - if (numberToClear == -1) - { - throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID); - } + int sizeToFree = 0; - int sizeToFree = 0; + for (int i = 0; i < numberToClear; i++) + { + final Packet packet = resendCache.poll(); - for (int i = 0; i < numberToClear; i++) + if (packet == null) { - final Packet packet = resendCache.poll(); - - if (packet == null) - { - log.warn("Can't find packet to clear: " + " last received command id " + - lastReceivedCommandID + - " first stored command id " + - firstStoredCommandID); - return; - } - - if (packet.getType() != PACKETS_CONFIRMED) - { - sizeToFree += packet.getPacketSize(); - } + log.warn("Can't find packet to clear: " + " last received command id " + + lastReceivedCommandID + + " first stored command id " + + firstStoredCommandID); + return; + } - if (commandConfirmationHandler != null) - { - commandConfirmationHandler.commandConfirmed(packet); - } + if (packet.getType() != PACKETS_CONFIRMED) + { + sizeToFree += packet.getPacketSize(); } - firstStoredCommandID += numberToClear; + if (commandConfirmationHandler != null) + { + commandConfirmationHandler.commandConfirmed(packet); + } } + + firstStoredCommandID += numberToClear; } }