diff --git a/src/main/java/org/buddycloud/channelserver/queue/FederatedQueueManager.java b/src/main/java/org/buddycloud/channelserver/queue/FederatedQueueManager.java index 73998c01..68f2e894 100644 --- a/src/main/java/org/buddycloud/channelserver/queue/FederatedQueueManager.java +++ b/src/main/java/org/buddycloud/channelserver/queue/FederatedQueueManager.java @@ -1,11 +1,12 @@ package org.buddycloud.channelserver.queue; +import java.io.UnsupportedEncodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import org.buddycloud.channelserver.ChannelsEngine; @@ -14,7 +15,6 @@ import org.dom4j.Element; import org.dom4j.Namespace; import org.dom4j.dom.DOMElement; -import org.hsqldb.Server; import org.xmpp.component.ComponentException; import org.xmpp.packet.IQ; import org.xmpp.packet.JID; @@ -41,14 +41,14 @@ public class FederatedQueueManager { private ConcurrentHashMap remoteServerItemsToProcess = new ConcurrentHashMap(); private ConcurrentHashMap remoteServerInfoRequestIds = new ConcurrentHashMap(); private ConcurrentHashMap> waitingStanzas = new ConcurrentHashMap>(); - + + private ConcurrentHashMap idMap = new ConcurrentHashMap(); + private ExpiringPacketQueue sentRemotePackets = new ExpiringPacketQueue(); private ExpiringPacketQueue nodeMap = new ExpiringPacketQueue(); private String localServer; - private BlockingQueue federatedResponseQueue; - public FederatedQueueManager(ChannelsEngine component, String localServer) { this.component = component; this.localServer = localServer; @@ -66,7 +66,12 @@ private int getId() { public void process(Packet packet) throws ComponentException { String to = packet.getTo().toString(); - sentRemotePackets.put(packet.getID(), packet.getFrom()); + + String uniqueId = generateUniqueId(packet); + idMap.put(uniqueId, packet.getID()); + packet.setID(uniqueId); + + sentRemotePackets.put(uniqueId, packet.getFrom()); try { extractNodeDetails(packet); // Do we have a map already? @@ -245,10 +250,14 @@ public void passResponseToRequester(IQ packet) throws Exception { "Can not find original requesting packet! (ID:" + packet.getID() + ")"); } - packet.setTo((JID) sentRemotePackets.get(packet.getID())); + + String uniqueId = packet.getID(); + packet.setID(idMap.get(uniqueId)); + packet.setTo((JID) sentRemotePackets.get(uniqueId)); packet.setFrom(localServer); sentRemotePackets.remove(packet.getID()); - + idMap.remove(uniqueId); + component.sendPacket(packet); } @@ -271,4 +280,46 @@ public void addChannelMap(JID server) { logger.error(e); } } + + /** + * Generate a unique ID for a packet + * + * Supplied packet IDs might not be unique so we use the ID and the FROM + * values to create a hash which we map back to the original packet ID. + * + * @param packet + * @return unique ID for the packet + */ + private String generateUniqueId(Packet packet) { + return generateMd5(packet.getID() + packet.getFrom()); + } + + /** + * Generates an MD5 hash of a supplied String + * + * @param message to encode + * @return MD5 Hash of supplied string + */ + private String generateMd5(String message) { + String digest = null; + try { + MessageDigest md = MessageDigest.getInstance("MD5"); + byte[] hash = md.digest(message.getBytes("UTF-8")); + + //converting byte array to Hexadecimal String + StringBuilder sb = new StringBuilder(2*hash.length); + for(byte b : hash) { + sb.append(String.format("%02x", b&0xff)); + } + + digest = sb.toString(); + } catch (UnsupportedEncodingException e) { + logger.info("Error generating unique packet ID"); + logger.error(e); + } catch (NoSuchAlgorithmException e) { + logger.info("Error generating unique packet ID"); + logger.error(e); + } + return digest; + } } \ No newline at end of file diff --git a/src/test/java/org/buddycloud/channelserver/queue/FederatedQueueManagerTest.java b/src/test/java/org/buddycloud/channelserver/queue/FederatedQueueManagerTest.java index 12348832..81a30f45 100644 --- a/src/test/java/org/buddycloud/channelserver/queue/FederatedQueueManagerTest.java +++ b/src/test/java/org/buddycloud/channelserver/queue/FederatedQueueManagerTest.java @@ -161,7 +161,7 @@ public void testPassingChannelServerIdentifierViaItemsResultsInQueuedPacketSendi Assert.assertEquals(expectedForwaredPacket.toXML(), originalPacketRedirected.toXML()); } - + @Test public void testOutgoingFederatedPacketsAreRoutedBackToOriginalSender() throws Exception { @@ -175,14 +175,58 @@ public void testOutgoingFederatedPacketsAreRoutedBackToOriginalSender() throws E packet.getElement().addAttribute("remote-server-discover", "false"); queueManager.process(packet.createCopy()); - channelsEngine.poll(); - - IQ response = IQ.createResultIQ(packet); + IQ originalPacketRedirected = (IQ) channelsEngine.poll(); + + IQ response = IQ.createResultIQ(originalPacketRedirected); queueManager.passResponseToRequester(response); - + Assert.assertEquals(1, channelsEngine.size()); Packet redirected = channelsEngine.poll(); - + + System.out.println(packet); + System.out.println(redirected); + Assert.assertEquals(packet.getFrom(), redirected.getTo()); } + + @Test + public void testOutgoingFederatedPacketsFromDifferentClientsUsingSameIdAreRoutedBackToOriginalSender() throws Exception { + + channelsEngine.clear(); + + IQ clientOnePacket = new IQ(); + clientOnePacket.setID("1:some-request"); + clientOnePacket.setFrom(new JID("romeo@montague.lit/street")); + clientOnePacket.setTo(new JID("topics.capulet.lit")); + clientOnePacket.setType(IQ.Type.get); + clientOnePacket.getElement().addAttribute("remote-server-discover", "false"); + + IQ clientTwoPacket = new IQ(); + clientTwoPacket.setID("1:some-request"); + clientTwoPacket.setFrom(new JID("juliet@montague.lit/street")); + clientTwoPacket.setTo(new JID("topics.capulet.lit")); + clientTwoPacket.setType(IQ.Type.get); + clientTwoPacket.getElement().addAttribute("remote-server-discover", "false"); + + queueManager.addChannelMap(new JID("topics.capulet.lit")); + + queueManager.process(clientOnePacket.createCopy()); + queueManager.process(clientTwoPacket.createCopy()); + + IQ clientOneOriginalPacketRedirected = (IQ) channelsEngine.poll(); + IQ clientTwoOriginalPacketRedirected = (IQ) channelsEngine.poll(); + + IQ clientOneResponse = IQ.createResultIQ(clientOneOriginalPacketRedirected); + queueManager.passResponseToRequester(clientOneResponse); + + IQ clientTwoResponse = IQ.createResultIQ(clientTwoOriginalPacketRedirected); + queueManager.passResponseToRequester(clientTwoResponse); + + Assert.assertEquals(2, channelsEngine.size()); + Packet clientOneRedirected = channelsEngine.poll(); + Packet clientTwoRedirected = channelsEngine.poll(); + + Assert.assertEquals(clientOnePacket.getFrom(), clientOneRedirected.getTo()); + Assert.assertEquals(clientTwoPacket.getFrom(), clientTwoRedirected.getTo()); + } } \ No newline at end of file