Skip to content

Commit

Permalink
[ibb] Ensure InBandBytestreamManager is a singleton
Browse files Browse the repository at this point in the history
InBandBytestreamManager followed an unusual pattern: Within the
connectionTermianted() callback, it would remove itself from the
'managers' map. This allowed for multiple instances of an
InBandBytestreamManager to exist for the same connection, causing all
kinds of issues.

This fixes the issue by changing InBandBytestreamManager to use the
Smack-idiomatic pattern used by managers.

We also do no longer reset the listeners if the connection is
termianted, as listeners (and handlers) typically persist until they
are explicitly removed by the user.

As positive side-effect, the number of indeterministic unit-tests,
caused by using Thread.sleep(), is reduced. The executor service in
InitiationListener was also removed, because the IQ handler is already
called asynchronously to the connections main loop.

Thanks to Anno van Vliet for reporting this.
  • Loading branch information
Flowdalic committed Jul 3, 2020
1 parent 0eeb894 commit ed02bcf
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,6 @@ public enum StanzaType {
public void connectionCreated(final XMPPConnection connection) {
// create the manager for this connection
InBandBytestreamManager.getByteStreamManager(connection);

// register shutdown listener
connection.addConnectionListener(new AbstractConnectionClosedListener() {

@Override
public void connectionTerminated() {
InBandBytestreamManager.getByteStreamManager(connection).disableService();
}

@Override
public void connected(XMPPConnection connection) {
InBandBytestreamManager.getByteStreamManager(connection);
}

});

}
});
}
Expand Down Expand Up @@ -206,6 +190,15 @@ public static synchronized InBandBytestreamManager getByteStreamManager(XMPPConn
private InBandBytestreamManager(XMPPConnection connection) {
super(connection);

connection.addConnectionListener(new AbstractConnectionClosedListener() {
@Override
public void connectionTerminated() {
// reset internal status
InBandBytestreamManager.this.sessions.clear();
InBandBytestreamManager.this.ignoredBytestreamRequests.clear();
}
});

// register bytestream open packet listener
this.initiationListener = new InitiationListener(this);
connection.registerIQRequestHandler(initiationListener);
Expand Down Expand Up @@ -453,19 +446,6 @@ protected void replyRejectPacket(IQ request) throws NotConnectedException, Inter
connection().sendStanza(error);
}

/**
* Responses to the given IQ packet's sender with an XMPP error that an In-Band Bytestream open
* request is rejected because its block size is greater than the maximum allowed block size.
*
* @param request IQ stanza that should be answered with a resource-constraint error
* @throws NotConnectedException if the XMPP connection is not connected.
* @throws InterruptedException if the calling thread was interrupted.
*/
protected void replyResourceConstraintPacket(IQ request) throws NotConnectedException, InterruptedException {
IQ error = IQ.createErrorResponse(request, StanzaError.Condition.resource_constraint);
connection().sendStanza(error);
}

/**
* Responses to the given IQ packet's sender with an XMPP error that an In-Band Bytestream
* session could not be found.
Expand Down Expand Up @@ -539,30 +519,4 @@ protected List<String> getIgnoredBytestreamRequests() {
return ignoredBytestreamRequests;
}

/**
* Disables the InBandBytestreamManager by removing its stanza listeners and resetting its
* internal status, which includes removing this instance from the managers map.
*/
private void disableService() {
final XMPPConnection connection = connection();

// remove manager from static managers map
managers.remove(connection);

// remove all listeners registered by this manager
connection.unregisterIQRequestHandler(initiationListener);
connection.unregisterIQRequestHandler(dataListener);
connection.unregisterIQRequestHandler(closeListener);

// shutdown threads
this.initiationListener.shutdown();

// reset internal status
this.userListeners.clear();
this.allRequestListeners.clear();
this.sessions.clear();
this.ignoredBytestreamRequests.clear();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,9 @@
*/
package org.jivesoftware.smackx.bytestreams.ibb;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.iqrequest.AbstractIqRequestHandler;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.StanzaError;

import org.jivesoftware.smackx.bytestreams.BytestreamListener;
import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
Expand All @@ -44,14 +38,10 @@
* @author Henning Staib
*/
class InitiationListener extends AbstractIqRequestHandler {
private static final Logger LOGGER = Logger.getLogger(InitiationListener.class.getName());

/* manager containing the listeners and the XMPP connection */
private final InBandBytestreamManager manager;

/* executor service to process incoming requests concurrently */
private final ExecutorService initiationListenerExecutor;

/**
* Constructor.
*
Expand All @@ -60,40 +50,29 @@ class InitiationListener extends AbstractIqRequestHandler {
protected InitiationListener(InBandBytestreamManager manager) {
super(Open.ELEMENT, Open.NAMESPACE, IQ.Type.set, Mode.async);
this.manager = manager;
initiationListenerExecutor = Executors.newCachedThreadPool();
}

@Override
public IQ handleIQRequest(final IQ packet) {
initiationListenerExecutor.execute(new Runnable() {

@Override
public void run() {
try {
processRequest(packet);
}
catch (InterruptedException | NotConnectedException e) {
LOGGER.log(Level.WARNING, "proccessRequest", e);
}
}
});
return null;
}

private void processRequest(Stanza packet) throws NotConnectedException, InterruptedException {
Open ibbRequest = (Open) packet;
public IQ handleIQRequest(final IQ iqRequest) {
Open ibbRequest = (Open) iqRequest;

int blockSize = ibbRequest.getBlockSize();
int maximumBlockSize = manager.getMaximumBlockSize();
// validate that block size is within allowed range
if (ibbRequest.getBlockSize() > this.manager.getMaximumBlockSize()) {
this.manager.replyResourceConstraintPacket(ibbRequest);
return;
if (blockSize > maximumBlockSize) {
StanzaError error = StanzaError.getBuilder().setCondition(StanzaError.Condition.resource_constraint)
.setDescriptiveEnText("Requests block size of " + blockSize + " exceeds maximum block size of "
+ maximumBlockSize)
.build();
return IQ.createErrorResponse(iqRequest, error);
}

StreamNegotiator.signal(ibbRequest.getFrom().toString() + '\t' + ibbRequest.getSessionID(), ibbRequest);

// ignore request if in ignore list
if (this.manager.getIgnoredBytestreamRequests().remove(ibbRequest.getSessionID()))
return;
if (this.manager.getIgnoredBytestreamRequests().remove(ibbRequest.getSessionID())) {
return null;
}

// build bytestream request from packet
InBandBytestreamRequest request = new InBandBytestreamRequest(this.manager, ibbRequest);
Expand All @@ -102,7 +81,6 @@ private void processRequest(Stanza packet) throws NotConnectedException, Interru
BytestreamListener userListener = this.manager.getUserListener(ibbRequest.getFrom());
if (userListener != null) {
userListener.incomingBytestreamRequest(request);

}
else if (!this.manager.getAllRequestListeners().isEmpty()) {
/*
Expand All @@ -111,21 +89,16 @@ else if (!this.manager.getAllRequestListeners().isEmpty()) {
for (BytestreamListener listener : this.manager.getAllRequestListeners()) {
listener.incomingBytestreamRequest(request);
}

}
else {
/*
* if there is no listener for this initiation request, reply with reject message
*/
this.manager.replyRejectPacket(ibbRequest);
StanzaError error = StanzaError.getBuilder()
.setCondition(StanzaError.Condition.not_acceptable)
.setDescriptiveEnText("No file-transfer listeners registered")
.build();
return IQ.createErrorResponse(iqRequest, error);
}
}

/**
* Shuts down the listeners executor service.
*/
protected void shutdown() {
this.initiationListenerExecutor.shutdownNow();
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,14 @@ public void setup() {
*/
@Test
public void shouldRespondWithError() throws Exception {

// run the listener with the initiation packet
initiationListener.handleIQRequest(initBytestream);

// wait because packet is processed in an extra thread
Thread.sleep(200);

// capture reply to the In-Band Bytestream open request
ArgumentCaptor<IQ> argument = ArgumentCaptor.forClass(IQ.class);
verify(connection).sendStanza(argument.capture());
IQ response = initiationListener.handleIQRequest(initBytestream);

// assert that reply is the correct error packet
assertEquals(initiatorJID, argument.getValue().getTo());
assertEquals(IQ.Type.error, argument.getValue().getType());
assertEquals(initiatorJID, response.getTo());
assertEquals(IQ.Type.error, response.getType());
assertEquals(StanzaError.Condition.not_acceptable,
argument.getValue().getError().getCondition());

response.getError().getCondition());
}

/**
Expand All @@ -113,21 +104,13 @@ public void shouldRejectRequestWithTooBigBlockSize() throws Exception {
byteStreamManager.setMaximumBlockSize(1024);

// run the listener with the initiation packet
initiationListener.handleIQRequest(initBytestream);

// wait because packet is processed in an extra thread
Thread.sleep(200);

// capture reply to the In-Band Bytestream open request
ArgumentCaptor<IQ> argument = ArgumentCaptor.forClass(IQ.class);
verify(connection).sendStanza(argument.capture());
IQ response = initiationListener.handleIQRequest(initBytestream);

// assert that reply is the correct error packet
assertEquals(initiatorJID, argument.getValue().getTo());
assertEquals(IQ.Type.error, argument.getValue().getType());
assertEquals(initiatorJID, response.getTo());
assertEquals(IQ.Type.error, response.getType());
assertEquals(StanzaError.Condition.resource_constraint,
argument.getValue().getError().getCondition());

response.getError().getCondition());
}

/**
Expand Down Expand Up @@ -199,24 +182,17 @@ public void shouldNotInvokeListenerForUser() throws Exception {
byteStreamManager.addIncomingBytestreamListener(listener, JidCreate.from("other_" + initiatorJID));

// run the listener with the initiation packet
initiationListener.handleIQRequest(initBytestream);

// wait because packet is processed in an extra thread
Thread.sleep(200);
IQ response = initiationListener.handleIQRequest(initBytestream);

// assert listener is not called
ArgumentCaptor<BytestreamRequest> byteStreamRequest = ArgumentCaptor.forClass(BytestreamRequest.class);
verify(listener, never()).incomingBytestreamRequest(byteStreamRequest.capture());

// capture reply to the In-Band Bytestream open request
ArgumentCaptor<IQ> argument = ArgumentCaptor.forClass(IQ.class);
verify(connection).sendStanza(argument.capture());

// assert that reply is the correct error packet
assertEquals(initiatorJID, argument.getValue().getTo());
assertEquals(IQ.Type.error, argument.getValue().getType());
assertEquals(initiatorJID, response.getTo());
assertEquals(IQ.Type.error, response.getType());
assertEquals(StanzaError.Condition.not_acceptable,
argument.getValue().getError().getCondition());
response.getError().getCondition());
}

/**
Expand Down

0 comments on commit ed02bcf

Please sign in to comment.