Skip to content

Commit

Permalink
[#2230] Support limiting resource usage in hono-client
Browse files Browse the repository at this point in the history
The client now supports setting
- the max frame size that should be used for outgoing connections and
- the max-message-size to be used for receiver links created by the
client,
- the minimum max-message-size that the peer is required to support for
sender links created by the client and
- the incoming window size of sessions created by the client.

Signed-off-by: Kai Hudalla <kai.hudalla@bosch.io>
  • Loading branch information
sophokles73 committed Oct 14, 2020
1 parent 16310b4 commit 7dd7b1b
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ protected Future<ProtonConnection> connectToAdapter() {

options.setConnectTimeout(properties.getConnectTimeout());
options.setHeartbeat(properties.getHeartbeatInterval());
options.setMaxFrameSize(properties.getMaxFrameSize());
Optional.ofNullable(properties.getAmqpHostname()).ifPresent(s -> options.setVirtualHost(s));

addTlsTrustOptions(options, properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.security.sasl.AuthenticationException;

import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.DisconnectListener;
Expand Down Expand Up @@ -62,6 +63,7 @@
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonSession;
import io.vertx.proton.sasl.MechanismMismatchException;
import io.vertx.proton.sasl.SaslSystemException;

Expand Down Expand Up @@ -119,6 +121,8 @@ public class HonoConnectionImpl implements HonoConnection {
private AtomicInteger connectAttempts;
private List<Symbol> offeredCapabilities = Collections.emptyList();
private Tracer tracer = NoopTracerFactory.create();
private ProtonSession session;
private Handler<AsyncResult<ProtonConnection>> connectionCloseHandler;

/**
* Creates a new client for a set of configuration properties.
Expand Down Expand Up @@ -301,7 +305,7 @@ private void checkConnected(final Handler<AsyncResult<Void>> resultHandler, fina
* @return {@code true} if the connection is established.
*/
protected boolean isConnectedInternal() {
return connection != null && !connection.isDisconnected();
return connection != null && !connection.isDisconnected() && session != null;
}

@Override
Expand All @@ -313,10 +317,12 @@ public final boolean isShutdown() {
* Sets the connection used to interact with the Hono server.
*
* @param connection The connection to use.
* @param session The session to use for links created on the connection.
*/
void setConnection(final ProtonConnection connection) {
void setConnection(final ProtonConnection connection, final ProtonSession session) {
synchronized (connectionLock) {
this.connection = connection;
this.session = session;
if (connection == null) {
this.offeredCapabilities = Collections.emptyList();
context = null;
Expand Down Expand Up @@ -405,12 +411,13 @@ private void connect(
connectionFactory.getServerRole());

clientOptions = options;
connectionCloseHandler = remoteClose -> onRemoteClose(remoteClose, disconnectHandler);
connectionFactory.connect(
clientOptions,
null,
null,
containerId,
remoteClose -> onRemoteClose(remoteClose, disconnectHandler),
connectionCloseHandler,
failedConnection -> onRemoteDisconnect(failedConnection, disconnectHandler),
conAttempt -> {
connecting.compareAndSet(true, false);
Expand All @@ -436,7 +443,8 @@ private void connect(
connectionFactory.getPort(),
connectionFactory.getServerRole(),
newConnection.getRemoteContainer());
setConnection(newConnection);
final ProtonSession session = createDefaultSession(newConnection);
setConnection(newConnection, session);
wrappedConnectionHandler.handle(Future.succeededFuture(this));
}
}
Expand All @@ -453,7 +461,7 @@ private void onRemoteClose(final AsyncResult<ProtonConnection> remoteClose,
final Handler<ProtonConnection> connectionLossHandler) {

if (remoteClose.failed()) {
log.info("remote server [{}:{}, role: {}] closed connection with error condition: {}",
log.info("remote server [{}:{}, role: {}] closed connection: {}",
connectionFactory.getHost(),
connectionFactory.getPort(),
connectionFactory.getServerRole(),
Expand Down Expand Up @@ -511,7 +519,8 @@ private void notifyReconnectHandlers(final AsyncResult<HonoConnection> reconnect
*/
protected void clearState() {

setConnection(null);
connectionCloseHandler = null;
setConnection(null, null);

notifyDisconnectHandlers();
// make sure we make configured number of attempts to re-connect
Expand Down Expand Up @@ -707,6 +716,33 @@ public void closeAndFree(
}
}

private ProtonSession createDefaultSession(final ProtonConnection connection) {

if (connection == null) {
throw new IllegalStateException("no connection to create session for");
} else {
log.debug("establishing AMQP session with server [{}:{}, role: {}]",
connectionFactory.getHost(),
connectionFactory.getPort(),
connectionFactory.getServerRole());
final ProtonSession session = connection.createSession();
session.closeHandler(remoteClose -> {
final StringBuilder msgBuilder = new StringBuilder("the connection's session closed unexpectedly");
Optional.ofNullable(session.getRemoteCondition())
.ifPresent(error -> {
msgBuilder.append(String.format(" [condition: %s, description: %s]",
error.getCondition(), error.getDescription()));
});
session.close();
Optional.ofNullable(connectionCloseHandler)
.ifPresent(ch -> ch.handle(Future.failedFuture(msgBuilder.toString())));
});
session.setIncomingCapacity(clientConfigProperties.getMaxSessionWindowSize());
session.open();
return session;
}
}

/**
* Creates a sender link.
*
Expand Down Expand Up @@ -739,7 +775,7 @@ public final Future<ProtonSender> createSender(
}

final Promise<ProtonSender> senderPromise = Promise.promise();
final ProtonSender sender = connection.createSender(targetAddress);
final ProtonSender sender = session.createSender(targetAddress);
sender.setQoS(qos);
sender.setAutoSettle(true);
final DisconnectListener<HonoConnection> disconnectBeforeOpenListener = (con) -> {
Expand Down Expand Up @@ -769,9 +805,21 @@ public final Future<ProtonSender> createSender(

} else if (HonoProtonHelper.isLinkEstablished(sender)) {

log.debug("sender open [target: {}, sendQueueFull: {}]", targetAddress, sender.sendQueueFull());
// wait on credits a little time, if not already given
if (sender.getCredit() <= 0) {
log.debug("sender open [target: {}, sendQueueFull: {}, remote max-message-size: {}]",
targetAddress, sender.sendQueueFull(), sender.getRemoteMaxMessageSize());
final long remoteMaxMessageSize = Optional.ofNullable(sender.getRemoteMaxMessageSize())
.map(UnsignedLong::longValue)
.orElse(0L);
if (remoteMaxMessageSize > 0 && remoteMaxMessageSize < clientConfigProperties.getMinMaxMessageSize()) {
// peer won't accept our (biggest) messages
sender.close();
final String msg = String.format(
"peer does not support minimum max-message-size [required: {}, supported: {}",
clientConfigProperties.getMinMaxMessageSize(), remoteMaxMessageSize);
log.debug(msg);
senderPromise.tryFail(new ClientErrorException(HttpURLConnection.HTTP_PRECON_FAILED, msg));
} else if (sender.getCredit() <= 0) {
// wait on credits a little time, if not already given
final long waitOnCreditsTimerId = vertx.setTimer(clientConfigProperties.getFlowLatency(),
timerID -> {
log.debug("sender [target: {}] has {} credits after grace period of {}ms",
Expand Down Expand Up @@ -856,7 +904,10 @@ public Future<ProtonReceiver> createReceiver(
return executeOnContext(result -> {
checkConnected().compose(v -> {
final Promise<ProtonReceiver> receiverPromise = Promise.promise();
final ProtonReceiver receiver = connection.createReceiver(sourceAddress);
final ProtonReceiver receiver = session.createReceiver(sourceAddress);
if (clientConfigProperties.getMaxMessageSize() > ClientConfigProperties.MAX_MESSAGE_SIZE_UNLIMITED) {
receiver.setMaxMessageSize(new UnsignedLong(clientConfigProperties.getMaxMessageSize()));
}
receiver.setAutoAccept(autoAccept);
receiver.setQoS(qos);
receiver.setPrefetch(preFetchSize);
Expand Down
Loading

0 comments on commit 7dd7b1b

Please sign in to comment.