Skip to content

Commit

Permalink
Merge df32bf3 into 44ad01a
Browse files Browse the repository at this point in the history
  • Loading branch information
Flowdalic committed Jun 9, 2022
2 parents 44ad01a + df32bf3 commit 12ede0e
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.GenericConnectionException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
import org.jivesoftware.smack.SmackException.SmackWrappedException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.StreamErrorException;
import org.jivesoftware.smack.packet.Element;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Nonza;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.StanzaError;
import org.jivesoftware.smack.packet.TopLevelStreamElement;
import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
import org.jivesoftware.smack.util.Async;
import org.jivesoftware.smack.util.CloseableUtil;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.xml.XmlPullParser;
Expand Down Expand Up @@ -90,6 +92,10 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection {
@SuppressWarnings("HidingField")
private final BOSHConfiguration config;

private final ArrayBlockingQueueWithShutdown<TopLevelStreamElement> outgoingQueue = new ArrayBlockingQueueWithShutdown<>(100, true);

private Thread writerThread;

// Some flags which provides some info about the current state.
private boolean isFirstInitialization = true;
private boolean done = false;
Expand Down Expand Up @@ -194,11 +200,16 @@ protected void connectInternal() throws SmackException, InterruptedException {
}
}

assert writerThread == null || !writerThread.isAlive();
outgoingQueue.start();
writerThread = Async.go(this::writeElements, this + " Writer");

// If there is no feedback, throw an remote server timeout error
if (!connected && !done) {
done = true;
String errorMessage = "Timeout reached for the connection to "
+ getHost() + ":" + getPort() + ".";
instantShutdown();
throw new SmackException.SmackMessageException(errorMessage);
}

Expand All @@ -207,6 +218,7 @@ protected void connectInternal() throws SmackException, InterruptedException {
"<stream:stream xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'/>");
onStreamOpen(parser);
} catch (XmlPullParserException | IOException e) {
instantShutdown();
throw new AssertionError("Failed to setup stream environment", e);
}
}
Expand Down Expand Up @@ -234,28 +246,70 @@ protected void loginInternal(String username, String password, Resourcepart reso
afterSuccessfulLogin(false);
}

@Override
public void sendNonza(Nonza element) throws NotConnectedException {
if (done) {
throw new NotConnectedException();
private volatile boolean writerThreadRunning;

private void writeElements() {
writerThreadRunning = true;
try {
while (true) {
TopLevelStreamElement element;
try {
element = outgoingQueue.take();
} catch (InterruptedException e) {
LOGGER.log(Level.FINE,
"Writer thread exiting: Outgoing queue was shutdown as signalled by interrupted exception",
e);
return;
}

String xmlPayload = element.toXML(BOSH_URI).toString();
ComposableBody.Builder composableBodyBuilder = ComposableBody.builder().setPayloadXML(xmlPayload);
if (sessionID != null) {
BodyQName qName = BodyQName.create(BOSH_URI, "sid");
composableBodyBuilder.setAttribute(qName, sessionID);
}

ComposableBody composableBody = composableBodyBuilder.build();

try {
client.send(composableBody);
} catch (BOSHException e) {
LOGGER.log(Level.WARNING, this + " received BOSHException in writer thread, connection broke!", e);
// TODO: Signal the user that there was an unexpected exception.
return;
}

if (element instanceof Stanza) {
Stanza stanza = (Stanza) element;
firePacketSendingListeners(stanza);
}
}
} finally {
writerThreadRunning = false;
notifyWaitingThreads();
}
sendElement(element);
}

@Override
protected void sendStanzaInternal(Stanza packet) throws NotConnectedException {
sendElement(packet);
}

private void sendElement(Element element) {
protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException {
throwNotConnectedExceptionIfAppropriate();
try {
send(ComposableBody.builder().setPayloadXML(element.toXML(BOSH_URI).toString()).build());
if (element instanceof Stanza) {
firePacketSendingListeners((Stanza) element);
}
outgoingQueue.put(element);
} catch (InterruptedException e) {
throwNotConnectedExceptionIfAppropriate();
// If the method above did not throw, then the sending thread was interrupted
throw e;
}
catch (BOSHException e) {
LOGGER.log(Level.SEVERE, "BOSHException in sendStanzaInternal", e);
}

@Override
protected void sendNonBlockingInternal(TopLevelStreamElement element)
throws NotConnectedException, OutgoingQueueFullException {
throwNotConnectedExceptionIfAppropriate();
boolean enqueued = outgoingQueue.offer(element);
if (!enqueued) {
throwNotConnectedExceptionIfAppropriate();
throw new OutgoingQueueFullException();
}
}

Expand All @@ -268,27 +322,37 @@ private void sendElement(Element element) {
*/
@Override
protected void shutdown() {

if (client != null) {
try {
client.disconnect();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "shutdown", e);
}
client = null;
}

instantShutdown();
}

@Override
public void instantShutdown() {
outgoingQueue.shutdown();

try {
boolean writerThreadTerminated = waitFor(() -> !writerThreadRunning);
if (!writerThreadTerminated) {
LOGGER.severe("Writer thread of " + this + " did not terminate timely");
}
} catch (InterruptedException e) {
LOGGER.log(Level.FINE, "Interrupted while waiting for writer thread to terminate", e);
}

setWasAuthenticated();
sessionID = null;
done = true;
authenticated = false;
connected = false;
isFirstInitialization = false;
client = null;

// Close down the readers and writers.
CloseableUtil.maybeClose(readerPipe, LOGGER);
Expand Down Expand Up @@ -410,14 +474,15 @@ protected void afterSaslAuthenticationSuccess()
// XMPP over BOSH is unusual when it comes to SASL authentication: Instead of sending a new stream open, it
// requires a special XML element ot be send after successful SASL authentication.
// See XEP-0206 § 5., especially the following is example 5 of XEP-0206.
ComposableBody composeableBody = ComposableBody.builder().setNamespaceDefinition("xmpp",
XMPPBOSHConnection.XMPP_BOSH_NS).setAttribute(
BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart",
"xmpp"), "true").setAttribute(
BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString()).build();
ComposableBody composeableBody = ComposableBody.builder()
.setNamespaceDefinition("xmpp", XMPPBOSHConnection.XMPP_BOSH_NS)
.setAttribute(BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart", "xmpp"), "true")
.setAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString())
.setAttribute(BodyQName.create(BOSH_URI, "sid"), sessionID)
.build();

try {
send(composeableBody);
client.send(composeableBody);
} catch (BOSHException e) {
// jbosh's exception API does not really match the one of Smack.
throw new SmackException.SmackWrappedException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.NotLoggedInException;
import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
import org.jivesoftware.smack.SmackException.SecurityRequiredException;
Expand Down Expand Up @@ -460,8 +461,14 @@ public int getPort() {
@Override
public abstract boolean isSecureConnection();

protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException;
// TODO: Collection<? extends TopLevelStreamElement> elements?
protected abstract void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException;

// TODO: Collection<? extends TopLevelStreamElement> elements?
// But how would the non-blocking property look like? Fail if all could not be added to the queue? Try to add some, and then retry if the queue is not drained?
protected abstract void sendNonBlockingInternal(TopLevelStreamElement element) throws NotConnectedException, OutgoingQueueFullException;

@SuppressWarnings("deprecation")
@Override
public boolean trySendStanza(Stanza stanza) throws NotConnectedException {
// Default implementation which falls back to sendStanza() as mentioned in the methods javadoc. May be
Expand All @@ -476,6 +483,7 @@ public boolean trySendStanza(Stanza stanza) throws NotConnectedException {
return true;
}

@SuppressWarnings("deprecation")
@Override
public boolean trySendStanza(Stanza stanza, long timeout, TimeUnit unit)
throws NotConnectedException, InterruptedException {
Expand All @@ -486,7 +494,14 @@ public boolean trySendStanza(Stanza stanza, long timeout, TimeUnit unit)
}

@Override
public abstract void sendNonza(Nonza element) throws NotConnectedException, InterruptedException;
public final void sendNonza(Nonza nonza) throws NotConnectedException, InterruptedException {
sendInternal(nonza);
}

@Override
public final void sendNonzaNonBlocking(Nonza nonza) throws NotConnectedException, OutgoingQueueFullException {
sendNonBlockingInternal(nonza);
}

@Override
public abstract boolean isUsingCompression();
Expand Down Expand Up @@ -853,8 +868,7 @@ public final StanzaFactory getStanzaFactory() {
return stanzaFactory;
}

@Override
public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
private Stanza preSendStanza(Stanza stanza) throws NotConnectedException {
Objects.requireNonNull(stanza, "Stanza must not be null");
assert stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ;

Expand All @@ -873,7 +887,19 @@ public final void sendStanza(Stanza stanza) throws NotConnectedException, Interr
// Invoke interceptors for the new stanza that is about to be sent. Interceptors may modify
// the content of the stanza.
Stanza stanzaAfterInterceptors = firePacketInterceptors(stanza);
sendStanzaInternal(stanzaAfterInterceptors);
return stanzaAfterInterceptors;
}

@Override
public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
stanza = preSendStanza(stanza);
sendInternal(stanza);
}

@Override
public final void sendStanzaNonBlocking(Stanza stanza) throws NotConnectedException, OutgoingQueueFullException {
stanza = preSendStanza(stanza);
sendNonBlockingInternal(stanza);
}

/**
Expand Down Expand Up @@ -2006,18 +2032,11 @@ public void run() {
}, timeout, TimeUnit.MILLISECONDS);

addAsyncStanzaListener(stanzaListener, replyFilter);
Runnable sendOperation = () -> {
try {
sendStanza(stanza);
}
catch (NotConnectedException | InterruptedException exception) {
future.setException(exception);
}
};
if (SmackConfiguration.TRUELY_ASYNC_SENDS) {
Async.go(sendOperation);
} else {
sendOperation.run();
try {
sendStanzaNonBlocking(stanza);
}
catch (NotConnectedException | OutgoingQueueFullException exception) {
future.setException(exception);
}

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,4 @@ public static void addAllKnownModulesTo(ModularXmppClientToServerConnectionConfi
}
}

/**
* If enabled, causes {@link AbstractXMPPConnection} to create a thread for every asynchronous send operation. This
* is meant to work-around a shortcoming of Smack 4.4, where certain send operations are not asynchronous even if
* they should be. This is an expert setting, do not toggle if you do not understand the consequences or have been
* told to do so. Note that it is expected that this will not be needed in future Smack versions.
*
* @since 4.4.6
*/
public static boolean TRUELY_ASYNC_SENDS = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ public NotConnectedException(XMPPConnection connection, StanzaFilter stanzaFilte
}
}

public static class OutgoingQueueFullException extends SmackException {

private static final long serialVersionUID = 1L;

}

public static class IllegalStateChangeException extends SmackException {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.filter.IQReplyFilter;
import org.jivesoftware.smack.filter.StanzaFilter;
Expand Down Expand Up @@ -199,6 +200,8 @@ public interface XMPPConnection {
* */
void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException;

void sendStanzaNonBlocking(Stanza stanza) throws NotConnectedException, OutgoingQueueFullException;

/**
* Try to send the given stanza. Returns {@code true} if the stanza was successfully put into the outgoing stanza
* queue, otherwise, if {@code false} is returned, the stanza could not be scheduled for sending (for example
Expand All @@ -213,7 +216,10 @@ public interface XMPPConnection {
* @return {@code true} if the stanza was successfully scheduled to be send, {@code false} otherwise.
* @throws NotConnectedException if the connection is not connected.
* @since 4.4.0
* @deprecated use {@link #sendStanzaNonBlocking(Stanza)} instead.
*/
// TODO: Remove in Smack 4.7.
@Deprecated
boolean trySendStanza(Stanza stanza) throws NotConnectedException;

/**
Expand All @@ -234,7 +240,10 @@ public interface XMPPConnection {
* @throws NotConnectedException if the connection is not connected.
* @throws InterruptedException if the calling thread was interrupted.
* @since 4.4.0
* @deprecated use {@link #sendStanzaNonBlocking(Stanza)} instead.
*/
// TODO: Remove in Smack 4.7.
@Deprecated
boolean trySendStanza(Stanza stanza, long timeout, TimeUnit unit) throws NotConnectedException, InterruptedException;

/**
Expand All @@ -251,6 +260,8 @@ public interface XMPPConnection {
*/
void sendNonza(Nonza nonza) throws NotConnectedException, InterruptedException;

void sendNonzaNonBlocking(Nonza stanza) throws NotConnectedException, OutgoingQueueFullException;

/**
* Adds a connection listener to this connection that will be notified when
* the connection closes or fails.
Expand Down

0 comments on commit 12ede0e

Please sign in to comment.