Skip to content

Commit

Permalink
Add prefetch parameter for AMQP
Browse files Browse the repository at this point in the history
  • Loading branch information
mederly committed Mar 26, 2019
1 parent 7ac3895 commit d05a28e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
Expand Up @@ -152,6 +152,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="prefetch" type="xsd:int" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Number of messages to prefetch. The default is 5.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:extension>
</xsd:complexContent>
Expand Down
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;

/**
* Async Update source for AMQP 0.9.1 brokers.
*
Expand All @@ -48,6 +50,7 @@
public class Amqp091AsyncUpdateSource implements AsyncUpdateSource {

private static final Trace LOGGER = TraceManager.getTrace(Amqp091AsyncUpdateSource.class);
private static final int DEFAULT_PREFETCH = 10;

@NotNull private final Amqp091SourceType sourceConfiguration;
@NotNull private final PrismContext prismContext;
Expand Down Expand Up @@ -97,6 +100,7 @@ private ListeningActivityImpl(AsyncUpdateMessageListener listener) {
state = State.PREPARING;
activeConnection = connectionFactory.newConnection();
activeChannel = activeConnection.createChannel();
activeChannel.basicQos(defaultIfNull(sourceConfiguration.getPrefetch(), DEFAULT_PREFETCH));
LOGGER.info("Opened AMQP connection = {}, channel = {}", activeConnection, activeChannel); // todo debug
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
Expand Down

0 comments on commit d05a28e

Please sign in to comment.