From 4a63c1d589cf601163589989013914692d35d099 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 17 Dec 2018 09:12:19 -0500 Subject: [PATCH] ARTEMIS-2205 Optimizing some Lambda usages https://issues.apache.org/jira/browse/ARTEMIS-2205 --- .../proton/ProtonServerSenderContext.java | 16 ++++++----- .../paging/cursor/PagedReferenceImpl.java | 26 ++++++++++-------- .../artemis/core/server/MessageReference.java | 11 +++++++- .../core/server/MessageReferenceCallback.java | 27 ------------------- .../core/server/impl/LastValueQueue.java | 5 ++-- .../server/impl/MessageReferenceImpl.java | 25 +++++++++-------- 6 files changed, 51 insertions(+), 59 deletions(-) delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 843d1fedcca9..4caf2d0047ba 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; @@ -53,6 +52,7 @@ import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; @@ -79,7 +79,7 @@ /** * This is the Equivalent for the ServerConsumer */ -public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler, MessageReferenceCallback { +public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class); @@ -92,7 +92,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback(); private Consumer brokerConsumer; - + private ReadyListener onflowControlReady; protected final AMQPSessionContext protonSession; protected final Sender sender; protected final AMQPConnectionContext connection; @@ -117,6 +117,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr * to sync the credits we have versus the credits that are being held in proton * */ private final Object creditsLock = new Object(); + private final java.util.function.Consumer executeDelivery; public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, @@ -127,6 +128,7 @@ public ProtonServerSenderContext(AMQPConnectionContext connection, this.sender = sender; this.protonSession = protonSession; this.sessionSPI = server; + this.executeDelivery = this::executeDelivery; } public Object getBrokerConsumer() { @@ -164,7 +166,7 @@ public void run() { } public boolean hasCredits() { - if (!connection.flowControl(brokerConsumer::promptDelivery)) { + if (!connection.flowControl(onflowControlReady)) { return false; } @@ -488,6 +490,7 @@ public void initialise() throws Exception { boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); try { brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly); + onflowControlReady = brokerConsumer::promptDelivery; } catch (ActiveMQAMQPResourceLimitExceededException e1) { throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage()); } catch (ActiveMQSecurityException e) { @@ -747,7 +750,7 @@ public int deliverMessage(final MessageReference messageReference, final ServerC } if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) { - messageReference.setCallback(this); + messageReference.onDelivery(executeDelivery); connection.runNow((Runnable)messageReference); } else { connection.runNow(() -> executeDelivery(messageReference)); @@ -760,8 +763,7 @@ public int deliverMessage(final MessageReference messageReference, final ServerC } } - @Override - public void executeDelivery(MessageReference messageReference) { + private void executeDelivery(MessageReference messageReference) { try { if (sender.getLocalState() == EndpointState.CLOSED) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index e05a9af550ce..893e3a746ef3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -18,13 +18,13 @@ import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AckReason; @@ -75,7 +75,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node private long messageSize = -1; - private MessageReferenceCallback callback; + private Consumer onDelivery; @Override public Object getProtocolData() { @@ -93,22 +93,26 @@ public Message getMessage() { } @Override - public void setCallback(MessageReferenceCallback callback) { - this.callback = callback; + public void onDelivery(Consumer onDelivery) { + assert this.onDelivery == null; + this.onDelivery = onDelivery; } + /** + * It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any. + */ @Override public void run() { - MessageReferenceCallback callback = this.callback; - - try { - if (callback != null) { - callback.executeDelivery(this); + final Consumer onDelivery = this.onDelivery; + if (onDelivery != null) { + try { + onDelivery.accept(this); + } finally { + this.onDelivery = null; } - } finally { - this.callback = null; } } + @Override public synchronized PagedMessage getPagedMessage() { PagedMessage returnMessage = message != null ? message.get() : null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 886af3628509..905f93d7c1d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.core.server; +import java.util.function.Consumer; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -44,7 +46,14 @@ public static MessageReference createReference(Message encode, final Queue queue SimpleString getLastValueProperty(); - void setCallback(MessageReferenceCallback callback); + /** + * This is to be used in cases where a message delivery happens on an executor. + * Most MessageReference implementations will allow execution, and if it does, + * and the protocol requires an execution per message, this callback may be used. + * + * At the time of this implementation only AMQP was used. + */ + void onDelivery(Consumer callback); /** * We define this method aggregation here because on paging we need to hold the original estimate, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java deleted file mode 100644 index 4804ddee13d7..000000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.server; - -/** This is to be used in cases where a message delivery happens on an executor. - * Most MessageReference implementations will allow execution, and if it does, - * and the protocol requires an execution per message, this callback may be used. - * - * At the time of this implementation only AMQP was used. */ -public interface MessageReferenceCallback { - void executeDelivery(MessageReference reference); -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 2fd70b605fb8..0ebd7a80d479 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; @@ -33,7 +34,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -50,6 +50,7 @@ * This is useful for example, for stock prices, where you're only interested in the latest value * for a particular stock */ +@SuppressWarnings("ALL") public class LastValueQueue extends QueueImpl { private final Map map = new ConcurrentHashMap<>(); @@ -238,7 +239,7 @@ private static class HolderReference implements MessageReference { } @Override - public void setCallback(MessageReferenceCallback callback) { + public void onDelivery(Consumer callback) { // HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 6d32c46800fa..12acffd57975 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -17,12 +17,12 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -55,7 +55,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node onDelivery; // Static -------------------------------------------------------- @@ -88,20 +88,23 @@ public MessageReferenceImpl(final Message message, final Queue queue) { // MessageReference implementation ------------------------------- @Override - public void setCallback(MessageReferenceCallback callback) { - this.callback = callback; + public void onDelivery(Consumer onDelivery) { + assert this.onDelivery == null; + this.onDelivery = onDelivery; } + /** + * It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any. + */ @Override public void run() { - MessageReferenceCallback callback = this.callback; - - try { - if (callback != null) { - callback.executeDelivery(this); + final Consumer onDelivery = this.onDelivery; + if (onDelivery != null) { + try { + onDelivery.accept(this); + } finally { + this.onDelivery = null; } - } finally { - this.callback = null; } }