Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

BOSH: store and re-send even reponses already acknowledged by the client

git-svn-id: https://svn.apache.org/repos/asf/mina/vysper/trunk@1365553 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit 6d8cec41771ca76d12f3a58eb0bb1c6ca2e45b00 1 parent 2c2c4d7
Bernd Fondermann authored
View
68 ...ep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.SortedMap;
@@ -90,10 +91,16 @@
/*
* A cache of sent responses to the BOSH client, kept in the event of delivery failure and retransmission requests.
+ * these sent responses are moved to the sentResponsesBacklog when the client acks their receival.
* See Broken Connections in XEP-0124.
*/
private final SortedMap<Long, BoshResponse> sentResponses = new TreeMap<Long, BoshResponse>();
+ /**
+ * backlog of sent responses which have been acked by the client (and thus shouldn't ever been needed to be resent.
+ */
+ private final ResponsesBuffer sentResponsesBacklog = new ResponsesBuffer();
+
private int parallelRequestsCount = 2;
private String boshVersion = "1.9";
@@ -222,7 +229,7 @@ synchronized public void write(Stanza stanza) {
}
return;
}
- BoshRequest req = requestsWindow.getNextRequest(false);
+ BoshRequest req = requestsWindow.pollNext();
if (req == null) {
LOGGER.error("SID = " + getSessionId() + " - no request for sending");
return;
@@ -234,7 +241,7 @@ synchronized public void write(Stanza stanza) {
// collect more requests for this RID
while (rid.equals(requestsWindow.firstRid())) {
- final BoshRequest sameRidRequest = requestsWindow.getNextRequest(false);
+ final BoshRequest sameRidRequest = requestsWindow.pollNext();
boshRequestsForRID.add(sameRidRequest);
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - multi requests ({}) per RID.", rid, boshRequestsForRID.size());
}
@@ -256,7 +263,8 @@ synchronized public void write(Stanza stanza) {
// that have not yet been acknowledged (this part is handled in insertRequest(BoshRequest)), or
// the hard limit maximumSentResponses (not in the specification) that prevents excessive memory consumption.
if (sentResponses.size() > maximumSentResponses || (!isClientAcknowledgements() && sentResponses.size() > parallelRequestsCount)) {
- sentResponses.remove(sentResponses.firstKey());
+ final Long key = sentResponses.firstKey();
+ sentResponsesBacklog.add(key, sentResponses.remove(key));
}
}
}
@@ -265,7 +273,8 @@ synchronized public void write(Stanza stanza) {
synchronized (sentResponses) {
LOGGER.warn("stored sent responses ({}) exeeds maximum ({}). purging.", sentResponses.size(), maximumSentResponses);
while (sentResponses.size() > maximumSentResponses) {
- sentResponses.remove(sentResponses.firstKey());
+ final Long key = sentResponses.firstKey();
+ sentResponsesBacklog.add(key, sentResponses.remove(key));
}
}
}
@@ -318,7 +327,7 @@ public void sendError(String condition) {
* @param condition the error condition
*/
protected void sendError(BoshRequest req, String condition) {
- req = req == null ? requestsWindow.getNextRequest(false) : req;
+ req = req == null ? requestsWindow.pollNext() : req;
if (req == null) {
LOGGER.warn("SID = " + getSessionId() + " - no request for sending BOSH error " + condition);
endSession(SessionTerminationCause.CONNECTION_ABORT);
@@ -347,7 +356,7 @@ public void close() {
// respond to all the queued HTTP requests with termination responses
synchronized (requestsWindow) {
BoshRequest next;
- while ((next = requestsWindow.getNextRequest(false)) != null) {
+ while ((next = requestsWindow.pollNext()) != null) {
Stanza body = BoshStanzaUtils.TERMINATE_BOSH_RESPONSE;
BoshResponse boshResponse = getBoshResponse(body, null);
if (LOGGER.isDebugEnabled()) {
@@ -569,20 +578,37 @@ public void insertRequest(final BoshRequest br) {
if (resend) {
// OLD: if (highestContinuousRid != null && rid <= highestContinuousRid) {
synchronized (sentResponses) {
- final String ridSeq = logRIDSequence();
- LOGGER.info("SID = " + getSessionId() + " - rid = {} - resend request. current buffer: {}", rid, ridSeq);
+ if (LOGGER.isInfoEnabled()) {
+ final String pendingRids = requestsWindow.logRequestWindow();
+ final String sentRids = logSentResponsesBuffer();
+ LOGGER.info("SID = " + getSessionId() + " - rid = {} - resend request. sent buffer: {} - req.win.: " + pendingRids, rid, sentRids);
+ }
if (sentResponses.containsKey(rid)) {
LOGGER.info("SID = " + getSessionId() + " - rid = {} (re-sending)", rid);
// Resending the old response
resendResponse(br);
} else {
- // rid not in sent responses. check to see if rid is still in requests window
- // to give a more qualified error
+ // not in sent responses, try alternatives: backlog and requestWindow
+
+ final BoshResponse response = sentResponsesBacklog.lookup(rid);
+ if (response != null) {
+ LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response retrieved from sentResponsesBacklog", rid);
+ resendResponse(br, rid, response);
+ return; // no error
+ }
+
+ // rid not in sent responses, nor backlog. check to see if rid is still in requests window
boolean inRequestsWindow = requestsWindow.containsRid(rid);
if (!inRequestsWindow) {
- LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response not in buffer error", rid);
+ if (LOGGER.isWarnEnabled()) {
+ final String sentRids = logSentResponsesBuffer();
+ LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response not in buffer error - " + sentRids, rid);
+ }
} else {
- LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response still in requests window ", rid);
+ if (LOGGER.isWarnEnabled()) {
+ final String sentRids = logSentResponsesBuffer();
+ LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response still in requests window - " + sentRids, rid);
+ }
}
sendError(br, "item-not-found");
}
@@ -629,6 +655,7 @@ public void insertRequest(final BoshRequest br) {
if (boshOuterBody.getAttribute("ack") == null) {
// if there is no ack attribute present then the client confirmed it received all the responses to all the previous requests
// and we clear the cache
+ sentResponsesBacklog.addAll(sentResponses);
sentResponses.clear();
} else if (!sentResponses.isEmpty()) {
// After receiving a request with an 'ack' value less than the 'rid' of the last request that it has already responded to,
@@ -690,11 +717,22 @@ public void insertRequest(final BoshRequest br) {
writeBoshResponse(BoshStanzaUtils.EMPTY_BOSH_RESPONSE);
}
}
+
+ public String logSentResponsesBuffer() {
+ final StringBuffer logMsg = new StringBuffer("sent = [");
+ for (Iterator<Long> iterator = sentResponses.keySet().iterator(); iterator.hasNext(); ) {
+ Long sentRid = iterator.next();
+ logMsg.append(sentRid);
+ if (iterator.hasNext()) logMsg.append(", ");
+ }
+ logMsg.append("]");
+ return logMsg.toString();
+ }
protected void respondToPause(int pause) {
LOGGER.debug("SID = " + getSessionId() + " - Setting inactivity period to {}", pause);
currentInactivitySeconds = pause;
- while (requestsWindow.getNextRequest(true) != null) {
+ while (!requestsWindow.isEmpty()) {
writeBoshResponse(BoshStanzaUtils.EMPTY_BOSH_RESPONSE);
}
}
@@ -743,6 +781,10 @@ protected void handleAsyncEventError(AsyncEvent event) {
protected void resendResponse(BoshRequest br) {
final Long rid = br.getRid();
BoshResponse boshResponse = sentResponses.get(rid);
+ resendResponse(br, rid, boshResponse);
+ }
+
+ protected void resendResponse(BoshRequest br, Long rid, BoshResponse boshResponse) {
if (boshResponse == null) {
LOGGER.debug("SID = " + getSessionId() + " - rid = {} - BOSH response could not (no longer) be retrieved for resending.", rid);
return;
View
8 ...tensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindow.java
@@ -113,21 +113,19 @@ public synchronized Long firstRid() {
* @return the next (by RID order) body to process
* @param peek TRUE: request is not removed from request window
*/
- public synchronized BoshRequest getNextRequest(boolean peek) {
+ public synchronized BoshRequest pollNext() {
if (queue.isEmpty()) return null;
String ridSeq = logRequestWindow();
currentProcessingRequest = Math.max(currentProcessingRequest, queue.peek().getRid());
if (currentProcessingRequest > highestContinuousRid) {
- LOGGER.debug("SID = " + sessionId + " - <= NULL, not current = " + currentProcessingRequest + " " + ridSeq);
+ LOGGER.debug("SID = " + sessionId + " - using RID = NULL, not current = " + currentProcessingRequest + " " + ridSeq);
return null;
}
- if (peek) return queue.peek();
-
final BoshRequest nextRequest = queue.poll();
- LOGGER.debug("SID = " + sessionId + " - " + (nextRequest == null ? " <= NULL" : "<= " + currentProcessingRequest) + " " + ridSeq);
+ LOGGER.debug("SID = " + sessionId + " - using RID = " + (nextRequest == null ? "NULL" : Long.toString(currentProcessingRequest)) + " " + ridSeq);
return nextRequest;
}
View
68 ...ensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/ResponsesBuffer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.vysper.xmpp.extension.xep0124;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ */
+public class ResponsesBuffer {
+
+ protected static final int CAPACITY = 30;
+
+ private static class Entry {
+ Long rid;
+ BoshResponse response;
+ }
+
+ private final Queue<Entry> sentResponsesBacklog = new ArrayBlockingQueue<Entry>(CAPACITY);
+
+ public void addAll(Map<Long, BoshResponse> responses) {
+ for (Map.Entry<Long, BoshResponse> mapEntry : responses.entrySet()) {
+ final Entry entry = new Entry();
+ entry.response = mapEntry.getValue();
+ entry.rid = mapEntry.getKey();
+ putEntry(entry);
+ }
+ }
+
+ public void add(Long rid, BoshResponse responses) {
+ final Entry entry = new Entry();
+ entry.response = responses;
+ entry.rid = rid;
+ putEntry(entry);
+ }
+
+ private void putEntry(Entry entry) {
+ if (sentResponsesBacklog.size() == CAPACITY) sentResponsesBacklog.poll();
+ sentResponsesBacklog.add(entry);
+ }
+
+ public BoshResponse lookup(Long rid) {
+ for (Entry entry : sentResponsesBacklog) {
+ if (entry.rid.equals(rid)) {
+ return entry.response;
+ }
+ }
+ return null;
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.