Skip to content

Commit

Permalink
[#1515] Use configured initial credits in DestinationCommandConsumer.
Browse files Browse the repository at this point in the history
Signed-off-by: Carsten Lohmann <carsten.lohmann@bosch-si.com>
  • Loading branch information
calohmn authored and Kai Hudalla committed Dec 13, 2019
1 parent e5e5cae commit 627cee8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 13 deletions.
Expand Up @@ -332,13 +332,12 @@ public void disposition(final DeliveryState deliveryState, final int credit) {
* @param credits The number of credits.
* @throws IllegalArgumentException if credits is &lt; 1
*/
// TODO since both DestinationCommandConsumer and MappingAndDelegatingCommandConsumer use automatic credit handling now (i.e. prefetch > 0), this method can be removed
private void flow(final int credits) {
if (credits < 1) {
throw new IllegalArgumentException("credits must be positive");
}
if (receiver.getPrefetch() > 0) {
LOG.debug("will not flow credits because receiver prefetch is non-zero");
} else {
if (receiver.getPrefetch() <= 0) {
currentSpan.log(String.format("flowing %d credits to sender", credits));
receiver.flow(credits);
}
Expand Down
Expand Up @@ -250,8 +250,8 @@ public void removeHandlerAndCloseConsumerIfEmpty(final String deviceId, final Ha
* The underlying receiver link will be created with the following properties:
* <ul>
* <li><em>auto accept</em> will be set to {@code true}</li>
* <li><em>pre-fetch size</em> will be set to {@code 0} to enforce manual flow control.
* However, the sender will be issued one credit on link establishment.</li>
* <li><em>pre-fetch size</em> will be set to the number of initial credits configured
* for the given connection.</li>
* </ul>
*
* @param con The connection to the server.
Expand Down Expand Up @@ -302,7 +302,7 @@ public static Future<DestinationCommandConsumer> create(
}
consumer.handleCommandMessage(msg, delivery);
},
0, // no pre-fetching
con.getConfig().getInitialCredits(),
false, // no auto-accept
sourceAddress -> { // remote close hook
LOG.debug("command receiver link [tenant-id: {}, device-id: {}] closed remotely",
Expand All @@ -316,7 +316,6 @@ public static Future<DestinationCommandConsumer> create(
final DestinationCommandConsumer consumer = new DestinationCommandConsumer(
con, receiver, tenantId, gatewayOrDeviceId);
consumerRef.set(consumer);
receiver.flow(1); // allow sender to send one command
consumer.setLocalCloseHandler(sourceAddress -> {
LOG.debug("command receiver link [tenant-id: {}, device-id: {}] closed locally",
tenantId, gatewayOrDeviceId);
Expand Down
Expand Up @@ -204,7 +204,7 @@ public void testCreateCommandConsumerSetsRemoteCloseHandler(final TestContext ct
eq(deviceSpecificCommandAddress),
eq(ProtonQoS.AT_LEAST_ONCE),
any(ProtonMessageHandler.class),
eq(0),
eq(props.getInitialCredits()),
eq(false),
closeHookCaptor.capture());
// invoke close hook
Expand Down Expand Up @@ -232,7 +232,7 @@ public void testCreateCommandConsumerWithGatewaySetsRemoteCloseHandler(final Tes
eq(gatewaySpecificCommandAddress),
eq(ProtonQoS.AT_LEAST_ONCE),
any(ProtonMessageHandler.class),
eq(0),
eq(props.getInitialCredits()),
eq(false),
closeHookCaptor.capture());
// invoke close hook
Expand Down Expand Up @@ -285,7 +285,7 @@ public void testLocalCloseRemovesCommandConsumerFromCache(final TestContext ctx)
eq(deviceSpecificCommandAddress),
eq(ProtonQoS.AT_LEAST_ONCE),
any(ProtonMessageHandler.class),
eq(0),
eq(props.getInitialCredits()),
eq(false),
VertxMockSupport.anyHandler());
return newConsumer;
Expand Down Expand Up @@ -340,7 +340,7 @@ public void testConsumerIsRecreatedOnConnectionFailure(final TestContext ctx) {
eq(deviceSpecificCommandAddress),
eq(ProtonQoS.AT_LEAST_ONCE),
any(ProtonMessageHandler.class),
eq(0),
eq(props.getInitialCredits()),
eq(false),
VertxMockSupport.anyHandler());

Expand Down Expand Up @@ -390,7 +390,7 @@ public void testLivenessCheckLocksRecreationAttempt(final TestContext ctx) {
eq(deviceSpecificCommandAddress),
eq(ProtonQoS.AT_LEAST_ONCE),
any(ProtonMessageHandler.class),
eq(0),
eq(props.getInitialCredits()),
eq(false),
VertxMockSupport.anyHandler());

Expand All @@ -404,7 +404,7 @@ public void testLivenessCheckLocksRecreationAttempt(final TestContext ctx) {
eq(deviceSpecificCommandAddress),
eq(ProtonQoS.AT_LEAST_ONCE),
any(ProtonMessageHandler.class),
eq(0),
eq(props.getInitialCredits()),
eq(false),
VertxMockSupport.anyHandler());
}
Expand Down

0 comments on commit 627cee8

Please sign in to comment.