Skip to content

Commit

Permalink
ARTEMIS-2205 Optimizing some Lambda usages
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 authored and clebertsuconic committed Dec 17, 2018
1 parent 6d93d0a commit c11605e
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 59 deletions.
Expand Up @@ -33,7 +33,6 @@
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
Expand All @@ -53,6 +52,7 @@
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
Expand All @@ -78,7 +78,7 @@
/**
* This is the Equivalent for the ServerConsumer
*/
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler, MessageReferenceCallback {
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {

private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);

Expand All @@ -91,7 +91,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback();

private Consumer brokerConsumer;

private ReadyListener onflowControlReady;
protected final AMQPSessionContext protonSession;
protected final Sender sender;
protected final AMQPConnectionContext connection;
Expand All @@ -116,6 +116,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
* to sync the credits we have versus the credits that are being held in proton
* */
private final Object creditsLock = new Object();
private final java.util.function.Consumer<? super MessageReference> executeDelivery;

public ProtonServerSenderContext(AMQPConnectionContext connection,
Sender sender,
Expand All @@ -126,6 +127,7 @@ public ProtonServerSenderContext(AMQPConnectionContext connection,
this.sender = sender;
this.protonSession = protonSession;
this.sessionSPI = server;
this.executeDelivery = this::executeDelivery;
}

public Object getBrokerConsumer() {
Expand Down Expand Up @@ -163,7 +165,7 @@ public void run() {
}

public boolean hasCredits() {
if (!connection.flowControl(brokerConsumer::promptDelivery)) {
if (!connection.flowControl(onflowControlReady)) {
return false;
}

Expand Down Expand Up @@ -489,6 +491,7 @@ public void initialise() throws Exception {
boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try {
brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
onflowControlReady = brokerConsumer::promptDelivery;
} catch (ActiveMQAMQPResourceLimitExceededException e1) {
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
} catch (ActiveMQSecurityException e) {
Expand Down Expand Up @@ -745,7 +748,7 @@ public int deliverMessage(final MessageReference messageReference, final ServerC
}

if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) {
messageReference.setCallback(this);
messageReference.onDelivery(executeDelivery);
connection.runNow((Runnable)messageReference);
} else {
connection.runNow(() -> executeDelivery(messageReference));
Expand All @@ -758,8 +761,7 @@ public int deliverMessage(final MessageReference messageReference, final ServerC
}
}

@Override
public void executeDelivery(MessageReference messageReference) {
private void executeDelivery(MessageReference messageReference) {

try {
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
Expand Down
Expand Up @@ -18,13 +18,13 @@

import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>

private long messageSize = -1;

private MessageReferenceCallback callback;
private Consumer<? super MessageReference> onDelivery;

@Override
public Object getProtocolData() {
Expand All @@ -93,22 +93,26 @@ public Message getMessage() {
}

@Override
public void setCallback(MessageReferenceCallback callback) {
this.callback = callback;
public void onDelivery(Consumer<? super MessageReference> onDelivery) {
assert this.onDelivery == null;
this.onDelivery = onDelivery;
}

/**
* It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any.
*/
@Override
public void run() {
MessageReferenceCallback callback = this.callback;

try {
if (callback != null) {
callback.executeDelivery(this);
final Consumer<? super MessageReference> onDelivery = this.onDelivery;
if (onDelivery != null) {
try {
onDelivery.accept(this);
} finally {
this.onDelivery = null;
}
} finally {
this.callback = null;
}
}

@Override
public synchronized PagedMessage getPagedMessage() {
PagedMessage returnMessage = message != null ? message.get() : null;
Expand Down
Expand Up @@ -17,6 +17,8 @@
package org.apache.activemq.artemis.core.server;


import java.util.function.Consumer;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -44,7 +46,14 @@ public static MessageReference createReference(Message encode, final Queue queue

SimpleString getLastValueProperty();

void setCallback(MessageReferenceCallback callback);
/**
* This is to be used in cases where a message delivery happens on an executor.
* Most MessageReference implementations will allow execution, and if it does,
* and the protocol requires an execution per message, this callback may be used.
*
* At the time of this implementation only AMQP was used.
*/
void onDelivery(Consumer<? super MessageReference> callback);

/**
* We define this method aggregation here because on paging we need to hold the original estimate,
Expand Down

This file was deleted.

Expand Up @@ -21,6 +21,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
Expand All @@ -33,7 +34,6 @@
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerConsumer;
Expand All @@ -50,6 +50,7 @@
* This is useful for example, for stock prices, where you're only interested in the latest value
* for a particular stock
*/
@SuppressWarnings("ALL")
public class LastValueQueue extends QueueImpl {

private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -238,7 +239,7 @@ private static class HolderReference implements MessageReference {
}

@Override
public void setCallback(MessageReferenceCallback callback) {
public void onDelivery(Consumer<? super MessageReference> callback) {
// HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables
}

Expand Down
Expand Up @@ -17,12 +17,12 @@
package org.apache.activemq.artemis.core.server.impl;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm

private Object protocolData;

private MessageReferenceCallback callback;
private Consumer<? super MessageReference> onDelivery;

// Static --------------------------------------------------------

Expand Down Expand Up @@ -88,20 +88,23 @@ public MessageReferenceImpl(final Message message, final Queue queue) {
// MessageReference implementation -------------------------------

@Override
public void setCallback(MessageReferenceCallback callback) {
this.callback = callback;
public void onDelivery(Consumer<? super MessageReference> onDelivery) {
assert this.onDelivery == null;
this.onDelivery = onDelivery;
}

/**
* It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any.
*/
@Override
public void run() {
MessageReferenceCallback callback = this.callback;

try {
if (callback != null) {
callback.executeDelivery(this);
final Consumer<? super MessageReference> onDelivery = this.onDelivery;
if (onDelivery != null) {
try {
onDelivery.accept(this);
} finally {
this.onDelivery = null;
}
} finally {
this.callback = null;
}
}

Expand Down

0 comments on commit c11605e

Please sign in to comment.