From 667e48ebd348ba80e6911079e7081510389efdda Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 2 Jan 2017 15:50:53 +0100 Subject: [PATCH] ARTEMIS-905 JCtools ConcurrentMap replacement --- artemis-cli/pom.xml | 4 + .../cli/commands/tools/XmlDataExporter.java | 8 +- .../artemis/cli/process/ProcessBuilder.java | 5 +- artemis-commons/pom.xml | 4 + .../core/server/NetworkHealthCheck.java | 8 +- .../artemis/logs/AssertionLoggerHandler.java | 4 +- .../activemq/artemis/utils/FactoryFinder.java | 4 +- ...ntPropertyBeanIntrospectorWithIgnores.java | 5 +- .../artemis/utils/uri/URIFactory.java | 4 +- .../client/impl/ClientSessionFactoryImpl.java | 8 +- .../artemis/core/client/impl/Topology.java | 6 +- .../artemis/core/cluster/DiscoveryGroup.java | 4 +- .../core/impl/RemotingConnectionImpl.java | 5 +- .../remoting/impl/netty/NettyConnector.java | 4 +- .../util/TimeAndCounterIDGeneratorTest.java | 5 +- artemis-distribution/pom.xml | 4 + .../src/main/assembly/dep.xml | 1 + .../jdbc/store/file/JDBCSequentialFile.java | 4 +- .../jdbc/store/journal/JDBCJournalImpl.java | 7 +- .../jms/client/ActiveMQConnection.java | 8 +- .../jms/client/ThreadAwareContext.java | 43 +++++-- .../jndi/ActiveMQInitialContextFactory.java | 4 +- .../journal/JMSJournalStorageManagerImpl.java | 8 +- .../impl/AbstractJournalUpdateTask.java | 29 ++++- .../core/journal/impl/FileWrapperJournal.java | 5 +- .../core/journal/impl/JournalCompactor.java | 12 +- .../core/journal/impl/JournalFileImpl.java | 4 +- .../core/journal/impl/JournalImpl.java | 66 ++++++----- .../artemis-amqp-protocol/pom.xml | 4 + .../amqp/broker/AMQPConnectionCallback.java | 4 +- .../client/ProtonClientConnectionManager.java | 4 +- .../amqp/proton/AMQPConnectionContext.java | 4 +- .../protocol/mqtt/MQTTConnectionManager.java | 4 +- .../core/protocol/mqtt/MQTTSession.java | 4 +- .../core/protocol/mqtt/MQTTSessionState.java | 21 ++-- .../mqtt/MQTTSubscriptionManager.java | 13 +- .../protocol/openwire/OpenWireConnection.java | 16 +-- .../openwire/OpenWireProtocolManager.java | 4 +- .../core/protocol/stomp/StompSession.java | 8 +- .../ra/ActiveMQRAConnectionManager.java | 6 +- .../artemis/ra/inflow/ActiveMQActivation.java | 4 +- .../artemis/ra/recovery/RecoveryManager.java | 4 +- .../artemis/rest/queue/ConsumersResource.java | 4 +- .../rest/queue/QueueDestinationsResource.java | 4 +- .../rest/queue/push/PushConsumerResource.java | 4 +- .../rest/topic/PushSubscriptionsResource.java | 4 +- .../rest/topic/SubscriptionsResource.java | 4 +- .../rest/topic/TopicDestinationsResource.java | 4 +- .../cursor/impl/PageCursorProviderImpl.java | 5 +- .../cursor/impl/PageSubscriptionImpl.java | 4 +- .../artemis/core/paging/impl/Page.java | 4 +- .../core/paging/impl/PagingManagerImpl.java | 11 +- .../AbstractJournalStorageManager.java | 6 +- .../core/postoffice/impl/BindingsImpl.java | 9 +- .../postoffice/impl/DuplicateIDCacheImpl.java | 4 +- .../core/postoffice/impl/PostOfficeImpl.java | 4 +- .../postoffice/impl/SimpleAddressManager.java | 8 +- .../impl/WildcardAddressManager.java | 6 +- .../core/impl/CoreProtocolManager.java | 4 +- .../core/registry/MapBindingRegistry.java | 4 +- .../core/remoting/impl/invm/InVMAcceptor.java | 4 +- .../remoting/impl/invm/InVMConnector.java | 4 +- .../core/remoting/impl/invm/InVMRegistry.java | 6 +- .../remoting/impl/netty/NettyAcceptor.java | 4 +- .../server/impl/RemotingServiceImpl.java | 10 +- .../core/replication/ReplicationEndpoint.java | 37 +++--- .../core/security/impl/SecurityStoreImpl.java | 12 +- .../core/server/cluster/ClusterManager.java | 4 +- .../cluster/impl/ClusterConnectionImpl.java | 6 +- .../group/impl/LocalGroupingHandler.java | 6 +- .../group/impl/RemoteGroupingHandler.java | 11 +- .../core/server/impl/ActiveMQServerImpl.java | 16 +-- .../core/server/impl/LastValueQueue.java | 4 +- .../artemis/core/server/impl/QueueImpl.java | 4 +- .../impl/ScheduledDeliveryHandlerImpl.java | 4 +- .../core/server/impl/ServerSessionImpl.java | 4 +- .../core/server/impl/ServiceRegistryImpl.java | 10 +- .../impl/ManagementServiceImpl.java | 8 +- .../impl/HierarchicalObjectRepository.java | 4 +- .../transaction/impl/ResourceManagerImpl.java | 4 +- .../group/impl/ClusteredResetMockTest.java | 4 +- .../jms/example/MessageGroupExample.java | 4 +- .../jms/example/MessageGroup2Example.java | 4 +- pom.xml | 7 ++ .../activemq/JmsRollbackRedeliveryTest.java | 8 +- .../transport/tcp/SocketTstFactory.java | 4 +- ...ConcurrentProducerDurableConsumerTest.java | 4 +- .../ConcurrentProducerQueueConsumerTest.java | 4 +- .../journal/JournalImplAppendTptBench.java | 111 ++++++++++++++++++ ...BMultipleHandlersServerDisconnectTest.java | 6 +- .../hornetq/HornetQProtocolManagerTest.java | 6 +- .../integration/client/ConsumerTest.java | 4 +- .../cluster/failover/QuorumFailOverTest.java | 4 +- .../integration/mqtt/imported/MQTTTest.java | 12 +- .../common/testjndi/TestContextFactory.java | 4 +- .../impl/fakes/FakeSequentialFileFactory.java | 4 +- .../core/paging/impl/PagingStoreImplTest.java | 7 +- 97 files changed, 510 insertions(+), 315 deletions(-) create mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplAppendTptBench.java diff --git a/artemis-cli/pom.xml b/artemis-cli/pom.xml index 06b173182c1..98c57e6d087 100644 --- a/artemis-cli/pom.xml +++ b/artemis-cli/pom.xml @@ -82,6 +82,10 @@ commons-lang3 ${commons.lang.version} + + org.jctools + jctools-core + com.sun.winsw winsw diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java index 8030ce2f5a8..8e3e991e228 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -91,6 +90,7 @@ import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.jctools.maps.NonBlockingHashMap; @Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.") public final class XmlDataExporter extends OptionalLocking { @@ -115,11 +115,11 @@ public final class XmlDataExporter extends OptionalLocking { private final HashMap queueBindings = new HashMap<>(); - private final Map jmsConnectionFactories = new ConcurrentHashMap<>(); + private final Map jmsConnectionFactories = new NonBlockingHashMap<>(); - private final Map, PersistedDestination> jmsDestinations = new ConcurrentHashMap<>(); + private final Map, PersistedDestination> jmsDestinations = new NonBlockingHashMap<>(); - private final Map, PersistedBindings> jmsJNDI = new ConcurrentHashMap<>(); + private final Map, PersistedBindings> jmsJNDI = new NonBlockingHashMap<>(); long messagesPrinted = 0L; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java index 5e4acf4df91..2c5905677b3 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/process/ProcessBuilder.java @@ -21,12 +21,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.Set; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.jctools.maps.NonBlockingHashSet; public class ProcessBuilder { - static ConcurrentHashSet processes = new ConcurrentHashSet<>(); + static Set processes = new NonBlockingHashSet<>(); static { Runtime.getRuntime().addShutdownHook(new Thread() { diff --git a/artemis-commons/pom.xml b/artemis-commons/pom.xml index 86f5eab0745..bc563d2a832 100644 --- a/artemis-commons/pom.xml +++ b/artemis-commons/pom.xml @@ -60,6 +60,10 @@ com.google.guava guava + + org.jctools + jctools-core + junit junit diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java index ec98aad3144..9a6bb2cff31 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java @@ -33,7 +33,7 @@ import org.apache.activemq.artemis.logs.ActiveMQUtilLogger; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.jctools.maps.NonBlockingHashSet; import org.jboss.logging.Logger; /** @@ -44,9 +44,9 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent { private static final Logger logger = Logger.getLogger(NetworkHealthCheck.class); - private final Set componentList = new ConcurrentHashSet<>(); - private final Set addresses = new ConcurrentHashSet<>(); - private final Set urls = new ConcurrentHashSet<>(); + private final Set componentList = new NonBlockingHashSet<>(); + private final Set addresses = new NonBlockingHashSet<>(); + private final Set urls = new NonBlockingHashSet<>(); private NetworkInterface networkInterface; public static final String IPV6_DEFAULT_COMMAND = "ping6 -c 1 %2$s"; diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java index ac89fc29277..e5127f10b53 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java @@ -17,11 +17,11 @@ package org.apache.activemq.artemis.logs; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import org.jboss.logmanager.ExtHandler; import org.jboss.logmanager.ExtLogRecord; +import org.jctools.maps.NonBlockingHashMap; /** * This class contains a tool where programs could intercept for LogMessage given an interval of time between {@link #startCapture()} @@ -31,7 +31,7 @@ */ public class AssertionLoggerHandler extends ExtHandler { - private static final Map messages = new ConcurrentHashMap<>(); + private static final Map messages = new NonBlockingHashMap<>(); private static boolean capture = false; /** diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FactoryFinder.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FactoryFinder.java index 2dda85fdf1c..ddb999c14fd 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FactoryFinder.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FactoryFinder.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import java.util.concurrent.ConcurrentMap; public class FactoryFinder { @@ -52,7 +52,7 @@ public interface ObjectFactory { */ protected static class StandaloneObjectFactory implements ObjectFactory { - final ConcurrentMap classMap = new ConcurrentHashMap<>(); + final ConcurrentMap classMap = new NonBlockingHashMap<>(); @Override public Object create(final String path) throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/FluentPropertyBeanIntrospectorWithIgnores.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/FluentPropertyBeanIntrospectorWithIgnores.java index 7df91312551..19cfb5ded96 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/FluentPropertyBeanIntrospectorWithIgnores.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/FluentPropertyBeanIntrospectorWithIgnores.java @@ -21,18 +21,19 @@ import java.beans.PropertyDescriptor; import java.lang.reflect.Method; import java.util.Locale; +import java.util.Set; import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.commons.beanutils.FluentPropertyBeanIntrospector; import org.apache.commons.beanutils.IntrospectionContext; import org.jboss.logging.Logger; +import org.jctools.maps.NonBlockingHashSet; public class FluentPropertyBeanIntrospectorWithIgnores extends FluentPropertyBeanIntrospector { static Logger logger = Logger.getLogger(FluentPropertyBeanIntrospectorWithIgnores.class); - private static ConcurrentHashSet> ignores = new ConcurrentHashSet<>(); + private static Set> ignores = new NonBlockingHashSet<>(); public static void addIgnore(String className, String propertyName) { logger.trace("Adding ignore on " + className + "/" + propertyName); diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URIFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URIFactory.java index bc62dd2e02c..54294716e10 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URIFactory.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URIFactory.java @@ -20,13 +20,13 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; public class URIFactory { private URI defaultURI; - private final Map> schemas = new ConcurrentHashMap<>(); + private final Map> schemas = new NonBlockingHashMap<>(); public URI getDefaultURI() { return defaultURI; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index addbbbde733..1a16a74c15b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -62,12 +62,12 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler; import org.apache.activemq.artemis.utils.ClassloadingUtil; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.jboss.logging.Logger; +import org.jctools.maps.NonBlockingHashSet; public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener { @@ -93,7 +93,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private final long connectionTTL; - private final Set sessions = new ConcurrentHashSet<>(); + private final Set sessions = new NonBlockingHashSet<>(); private final Object createSessionLock = new Object(); @@ -121,9 +121,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private int reconnectAttempts; - private final Set listeners = new ConcurrentHashSet<>(); + private final Set listeners = new NonBlockingHashSet<>(); - private final Set failoverListeners = new ConcurrentHashSet<>(); + private final Set failoverListeners = new NonBlockingHashSet<>(); private Connector connector; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java index 096fd66c451..829d99359de 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java @@ -23,7 +23,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -73,7 +73,7 @@ public Topology(final Object owner) { public Topology(final Object owner, final Executor executor) { this.topologyListeners = new HashSet<>(); - this.topology = new ConcurrentHashMap<>(); + this.topology = new NonBlockingHashMap<>(); if (executor == null) { throw new IllegalArgumentException("Executor is required"); } @@ -450,7 +450,7 @@ public String toString() { private synchronized Map getMapDelete() { if (mapDelete == null) { - mapDelete = new ConcurrentHashMap<>(); + mapDelete = new NonBlockingHashMap<>(); } return mapDelete; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java index 7c40602de69..4b8a1e1d64d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java @@ -21,7 +21,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; @@ -60,7 +60,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { private final Object waitLock = new Object(); - private final Map connectors = new ConcurrentHashMap<>(); + private final Map connectors = new NonBlockingHashMap<>(); private final long timeout; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index 8bd62ca4e6d..f97faab22ff 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -18,9 +18,7 @@ import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -41,6 +39,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.jboss.logging.Logger; +import org.jctools.maps.NonBlockingHashMapLong; public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection { @@ -48,7 +47,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement private final PacketDecoder packetDecoder; - private final Map channels = new ConcurrentHashMap<>(); + private final NonBlockingHashMapLong channels = new NonBlockingHashMapLong<>(); private final long blockingCallTimeout; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index c809b966251..3f544d58be1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -37,7 +37,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -209,7 +209,7 @@ public class NettyConnector extends AbstractConnector { private long batchDelay; - private ConcurrentMap connections = new ConcurrentHashMap<>(); + private ConcurrentMap connections = new NonBlockingHashMap<>(); private String servletPath; diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/util/TimeAndCounterIDGeneratorTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/util/TimeAndCounterIDGeneratorTest.java index c2ec02d7c98..29d8875cabe 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/util/TimeAndCounterIDGeneratorTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/util/TimeAndCounterIDGeneratorTest.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.util; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.TimeAndCounterIDGenerator; +import org.jctools.maps.NonBlockingHashSet; import org.junit.Assert; import org.junit.Test; @@ -73,7 +74,7 @@ public void testCalculationRefresh() { @Test public void testCalculationOnMultiThread() throws Throwable { - final ConcurrentHashSet hashSet = new ConcurrentHashSet<>(); + final Set hashSet = new NonBlockingHashSet<>(); final TimeAndCounterIDGenerator seq = new TimeAndCounterIDGenerator(); diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml index 1236f17c0ee..04ae6c7c597 100644 --- a/artemis-distribution/pom.xml +++ b/artemis-distribution/pom.xml @@ -201,6 +201,10 @@ org.apache.johnzon johnzon-core + + org.jctools + jctools-core + diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index 92c984cce6b..4c42e41db5a 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -96,6 +96,7 @@ org.jgroups:jgroups org.apache.geronimo.specs:geronimo-json_1.0_spec org.apache.johnzon:johnzon-core + org.jctools:jctools-core + diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java index 23e7781e59f..fc3173acc63 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java @@ -26,7 +26,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.broker.BrokerService; @@ -112,7 +112,7 @@ public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) throw // Consume messages and rollback transactions { AtomicInteger received = new AtomicInteger(); - Map rolledback = new ConcurrentHashMap<>(); + Map rolledback = new NonBlockingHashMap<>(); while (received.get() < nbMessages) { Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationName); @@ -148,7 +148,7 @@ public void testRedeliveryOnSingleConsumer() throws Exception { // Consume messages and rollback transactions { AtomicInteger received = new AtomicInteger(); - Map rolledback = new ConcurrentHashMap<>(); + Map rolledback = new NonBlockingHashMap<>(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationName); MessageConsumer consumer = session.createConsumer(destination); @@ -182,7 +182,7 @@ public void testRedeliveryOnSingleSession() throws Exception { // Consume messages and rollback transactions { AtomicInteger received = new AtomicInteger(); - Map rolledback = new ConcurrentHashMap<>(); + Map rolledback = new NonBlockingHashMap<>(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationName); while (received.get() < nbMessages) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java index 68c1bc287c4..08853acd92a 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java @@ -22,7 +22,7 @@ import java.net.Socket; import java.net.UnknownHostException; import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; @@ -35,7 +35,7 @@ public class SocketTstFactory extends SocketFactory { private static final Logger LOG = LoggerFactory.getLogger(SocketTstFactory.class); - private static final ConcurrentMap closeIter = new ConcurrentHashMap<>(); + private static final ConcurrentMap closeIter = new NonBlockingHashMap<>(); private class SocketTst { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java index 0df68752978..96ea26e4c61 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java @@ -36,7 +36,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -395,7 +395,7 @@ class TimedMessageListener implements MessageListener { long batchReceiptAccumulator = 0; long maxReceiptTime = 0; AtomicLong count = new AtomicLong(0); - Map messageLists = new ConcurrentHashMap<>(new HashMap()); + Map messageLists = new NonBlockingHashMap<>(new HashMap()); @Override public void onMessage(Message message) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java index 9631a0c3f4e..3cfeb3620a7 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java @@ -34,7 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -349,7 +349,7 @@ static class TimedMessageListener implements MessageListener { long batchReceiptAccumulator = 0; long maxReceiptTime = 0; - final Map messageLists = new ConcurrentHashMap<>(new HashMap()); + final Map messageLists = new NonBlockingHashMap<>(new HashMap()); @Override public void onMessage(Message message) { diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplAppendTptBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplAppendTptBench.java new file mode 100644 index 00000000000..375cd55dd25 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplAppendTptBench.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.extras.benchmarks.journal; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; + +import io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.io.DummyCallback; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; + +public class JournalImplAppendTptBench { + + private static final int FILE_SIZE = 1024 * 1024 * 1024; + private static final int ITERATIONS = 100_000; + private static final int WARMUP_ITERATIONS = 20_000; + private static final int TESTS = 10; + private static int TOTAL_MESSAGES = (ITERATIONS * TESTS + WARMUP_ITERATIONS); + private static int ENCODED_SIZE = 8; + private static final EncodingSupport encodingSupport = new EncodingSupport() { + @Override + public int getEncodeSize() { + return ENCODED_SIZE; + } + + @Override + public void encode(ActiveMQBuffer buffer) { + buffer.writerIndex(buffer.writerIndex() + ENCODED_SIZE); + } + + @Override + public void decode(ActiveMQBuffer buffer) { + throw new UnsupportedOperationException(); + } + }; + private static byte RECORD_TYPE = (byte) 1; + + public static void main(String[] args) throws Exception { + int numFiles = (int) ((TOTAL_MESSAGES * 1024 + 512) / FILE_SIZE * 1.3); + if (numFiles < 2) { + numFiles = 2; + } + final File journalDir = new File(System.getProperty("java.io.tmpdir")); + final SequentialFileFactory sequentialFileFactory = new NIOSequentialFileFactory(journalDir, false, 0, 0, 1, false, (code, message, file) -> { + }) { + + private final ByteBuffer buffer = ByteBuffer.allocateDirect(UnsafeAccess.UNSAFE.pageSize()); + + @Override + public ByteBuffer newBuffer(int size) { + //little trick to reduce GC pressure + if (size > buffer.capacity()) + throw new IllegalStateException("IMPOSSIBLE!"); + this.buffer.clear(); + this.buffer.limit(size); + return this.buffer; + } + + @Override + public boolean isSupportsCallbacks() { + return true; + } + }; + final JournalImpl journal = new JournalImpl(() -> Runnable::run, FILE_SIZE, numFiles, numFiles, 0, 0, sequentialFileFactory, "activemq-data", "amq", 1, 0); + journal.start(); + journal.load(new ArrayList<>(), null, null); + try { + long id = 0; + for (int i = 0; i < WARMUP_ITERATIONS; i++) { + journal.appendAddRecord(id, RECORD_TYPE, encodingSupport, false, DummyCallback.getInstance()); + id++; + } + for (int t = 0; t < TESTS; t++) { + System.gc(); + final long start = System.nanoTime(); + for (int i = 0; i < ITERATIONS; i++) { + journal.appendAddRecord(id, RECORD_TYPE, encodingSupport, false, DummyCallback.getInstance()); + id++; + } + final long elapsed = System.nanoTime() - start; + System.out.println((ITERATIONS * 1000_000_000L) / elapsed); + } + } finally { + journal.stop(); + for (File file : journalDir.listFiles()) { + file.deleteOnExit(); + } + } + } + +} diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java index 2b6868f32d3..9010e4d82b9 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java @@ -31,7 +31,9 @@ import java.lang.reflect.Method; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; + +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -64,7 +66,7 @@ */ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase { - final ConcurrentHashMap mapCounter = new ConcurrentHashMap<>(); + final ConcurrentMap mapCounter = new NonBlockingHashMap<>(); volatile ActiveMQResourceAdapter resourceAdapter; diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java index 29542c9b932..60fd888aba9 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java @@ -23,7 +23,7 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.ClientSession; @@ -85,8 +85,8 @@ public void testLegacy() throws Exception { connectionFactory2.createConnection().close(); RecoveryManager manager = new RecoveryManager(); - manager.register(connectionFactory, null, null, new ConcurrentHashMap()); - manager.register(connectionFactory2, null, null, new ConcurrentHashMap()); + manager.register(connectionFactory, null, null, new NonBlockingHashMap()); + manager.register(connectionFactory2, null, null, new NonBlockingHashMap()); for (XARecoveryConfig resource : manager.getResources()) { try (ServerLocator locator = resource.createServerLocator(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 1c1f929dab3..62e4072e099 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.jctools.maps.NonBlockingHashSet; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -371,7 +371,7 @@ public void onMessage(final ClientMessage msg) { @Test public void testReceiveAndResend() throws Exception { - final Set sessions = new ConcurrentHashSet<>(); + final Set sessions = new NonBlockingHashSet<>(); final AtomicInteger errors = new AtomicInteger(0); final SimpleString QUEUE_RESPONSE = SimpleString.toSimpleString("QUEUE_RESPONSE"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverTest.java index 6043f888ca7..2b98533ff1c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverTest.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.Pair; @@ -102,7 +102,7 @@ protected boolean isSharedStorage() { private static class TopologyListener implements ClusterTopologyListener { final String prefix; - final Map> nodes = new ConcurrentHashMap<>(); + final Map> nodes = new NonBlockingHashMap<>(); private TopologyListener(String string) { prefix = string; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 79029be9058..ad6c39d0cf1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -51,12 +51,12 @@ import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.codec.MQTTFrame; import org.fusesource.mqtt.codec.PUBLISH; +import org.jctools.maps.NonBlockingHashSet; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.vertx.java.core.impl.ConcurrentHashSet; /** * QT @@ -73,11 +73,11 @@ public class MQTTTest extends MQTTTestSupport { public void setUp() throws Exception { Field sessions = MQTTSession.class.getDeclaredField("SESSIONS"); sessions.setAccessible(true); - sessions.set(null, new ConcurrentHashMap<>()); + sessions.set(null, new NonBlockingHashMap<>()); Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS"); connectedClients.setAccessible(true); - connectedClients.set(null, new ConcurrentHashSet<>()); + connectedClients.set(null, new NonBlockingHashSet<>()); super.setUp(); } @@ -825,7 +825,7 @@ public void testPacketIdGeneratorNonCleanSession() throws Exception { final MQTT mqtt = createMQTTConnection("nonclean-packetid", false); mqtt.setKeepAlive((short) 15); - final Map publishMap = new ConcurrentHashMap<>(); + final Map publishMap = new NonBlockingHashMap<>(); mqtt.setTracer(new Tracer() { @Override public void onReceive(MQTTFrame frame) { @@ -899,7 +899,7 @@ public void onSend(MQTTFrame frame) { // If there is a good reason for this we should follow ActiveMQ. public void testPacketIdGeneratorCleanSession() throws Exception { final String[] cleanClientIds = new String[]{"", "clean-packetid", null}; - final Map publishMap = new ConcurrentHashMap<>(); + final Map publishMap = new NonBlockingHashMap<>(); MQTT[] mqtts = new MQTT[cleanClientIds.length]; for (int i = 0; i < cleanClientIds.length; i++) { mqtts[i] = createMQTTConnection("", true); diff --git a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/testjndi/TestContextFactory.java b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/testjndi/TestContextFactory.java index 15cc51cb8bf..dffe117d8ac 100644 --- a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/testjndi/TestContextFactory.java +++ b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/testjndi/TestContextFactory.java @@ -24,7 +24,7 @@ import javax.naming.spi.InitialContextFactory; import java.util.Hashtable; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.jndi.LazyCreateContext; @@ -50,7 +50,7 @@ public class TestContextFactory implements InitialContextFactory { @Override public Context getInitialContext(Hashtable environment) throws NamingException { // lets create a factory - Map data = new ConcurrentHashMap<>(); + Map data = new NonBlockingHashMap<>(); for (Map.Entry entry : environment.entrySet()) { String key = entry.getKey().toString(); if (key.startsWith(connectionFactoryPrefix)) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java index 0316945bc3a..a789072fb81 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.jctools.maps.NonBlockingHashMap; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; @@ -35,7 +35,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory { - private final Map fileMap = new ConcurrentHashMap<>(); + private final Map fileMap = new NonBlockingHashMap<>(); private final int alignment; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 923e7195606..1e81da5eba7 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -20,8 +20,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -62,6 +60,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; +import org.jctools.maps.NonBlockingHashMapLong; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -374,7 +373,7 @@ protected void testConcurrentPaging(final SequentialFileFactory factory, final CountDownLatch latchStart = new CountDownLatch(numberOfThreads); - final ConcurrentHashMap buffers = new ConcurrentHashMap<>(); + final NonBlockingHashMapLong buffers = new NonBlockingHashMapLong<>(); final ArrayList readPages = new ArrayList<>(); @@ -477,7 +476,7 @@ public void run() { throw consumer.e; } - final ConcurrentMap buffers2 = new ConcurrentHashMap<>(); + final NonBlockingHashMapLong buffers2 = new NonBlockingHashMapLong<>(); for (Page page : readPages) { page.open();