From 768063c2b75b99f3d805daca7f20862d09671f33 Mon Sep 17 00:00:00 2001 From: Andrey Kuznetsov Date: Mon, 5 Mar 2018 22:54:25 +0300 Subject: [PATCH] ignite-7517 Almost all usages of ConcurrentLinkedDeque8 class are removed Signed-off-by: Andrey Gura --- .../internal/igfs/common/IgfsLogger.java | 8 +- .../managers/communication/GridIoManager.java | 18 +- .../deployment/GridDeploymentLocalStore.java | 27 +- .../cache/GridCacheMvccManager.java | 7 +- .../cache/GridDeferredAckMessageSender.java | 10 +- .../dht/GridDhtLocalPartition.java | 5 +- .../CacheContinuousQueryEventBuffer.java | 5 +- .../store/GridCacheWriteBehindStore.java | 11 +- .../cache/transactions/IgniteTxManager.java | 3 +- .../GridContinuousBatchAdapter.java | 5 +- .../processors/task/GridTaskWorker.java | 4 +- .../ignite/internal/util/lang/GridFunc.java | 18 - .../ConcurrentDequeFactoryCallable.java | 40 -- .../util/nio/GridSelectorNioSessionImpl.java | 5 +- .../jobstealing/JobStealingCollisionSpi.java | 4 +- .../tcp/TcpCommunicationSpi.java | 4 +- .../ignite/util/deque/FastSizeDeque.java | 372 ++++++++++++++++++ .../resources/META-INF/classnames.properties | 39 +- ...dCacheReplicatedSynchronousCommitTest.java | 4 +- .../future/GridCompoundFutureSelfTest.java | 8 +- .../GridFutureListenPerformanceTest.java | 4 +- .../util/future/GridFutureQueueTest.java | 5 +- .../lang/GridFutureListenPerformanceTest.java | 4 +- .../GridCircularBufferPerformanceTest.java | 5 +- .../utils/GridCircularBufferSelfTest.java | 9 +- .../tcp/GridTcpCommunicationSpiLanTest.java | 13 +- ...CommunicationSpiMultithreadedSelfTest.java | 30 +- .../external/HadoopExternalTaskExecutor.java | 4 +- .../HadoopExternalCommunication.java | 4 +- 29 files changed, 506 insertions(+), 169 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ConcurrentDequeFactoryCallable.java create mode 100644 modules/core/src/main/java/org/apache/ignite/util/deque/FastSizeDeque.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsLogger.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsLogger.java index f95fd4c378978..816fa8e01f894 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsLogger.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsLogger.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -36,8 +38,6 @@ import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; -import java.util.concurrent.ConcurrentHashMap; -import org.jsr166.ConcurrentLinkedDeque8; /** * IGFS client logger writing data to the file. @@ -234,7 +234,7 @@ private IgfsLogger(String endpoint, String igfsName, String dir, int batchSize) file = new File(dirFile, "igfs-log-" + igfsName + "-" + pid + ".csv"); - entries = new ConcurrentLinkedDeque8<>(); + entries = new ConcurrentLinkedDeque<>(); cnt = new AtomicInteger(); useCnt = new AtomicInteger(); @@ -727,7 +727,7 @@ private void flush() { try { entries0 = entries; - entries = new ConcurrentLinkedDeque8<>(); + entries = new ConcurrentLinkedDeque<>(); } finally { rwLock.writeLock().unlock(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index b2e9735041a02..d5cdd2dd7fc53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Date; +import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -96,7 +97,6 @@ import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; @@ -164,8 +164,7 @@ public class GridIoManager extends GridManagerAdapter> waitMap = - new ConcurrentHashMap<>(); + private final ConcurrentMap> waitMap = new ConcurrentHashMap<>(); /** Communication message listener. */ private CommunicationListener commLsnr; @@ -774,7 +773,7 @@ private void format(StringBuilder b, Collection> pairs, SimpleD lock.writeLock().lock(); try { - ConcurrentLinkedDeque8 waitList = waitMap.remove(nodeId); + Deque waitList = waitMap.remove(nodeId); if (log.isDebugEnabled()) log.debug("Removed messages from discovery startup delay list " + @@ -804,9 +803,9 @@ private void format(StringBuilder b, Collection> pairs, SimpleD try { started = true; - for (Entry> e : waitMap.entrySet()) { + for (Entry> e : waitMap.entrySet()) { if (ctx.discovery().node(e.getKey()) != null) { - ConcurrentLinkedDeque8 waitList = waitMap.remove(e.getKey()); + Deque waitList = waitMap.remove(e.getKey()); if (log.isDebugEnabled()) log.debug("Processing messages from discovery startup delay list: " + waitList); @@ -954,8 +953,11 @@ private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) { log.debug("Adding message to waiting list [senderId=" + nodeId + ", msg=" + msg + ']'); - ConcurrentLinkedDeque8 list = - F.addIfAbsent(waitMap, nodeId, F.newDeque()); + Deque list = F.>addIfAbsent( + waitMap, + nodeId, + ConcurrentLinkedDeque::new + ); assert list != null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java index 14581f6b43c5c..b27cc4bd0275f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -26,6 +27,7 @@ import java.util.Map.Entry; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.compute.ComputeTask; @@ -46,7 +48,6 @@ import org.apache.ignite.spi.deployment.DeploymentResource; import org.apache.ignite.spi.deployment.DeploymentSpi; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOYED; import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOY_FAILED; @@ -60,8 +61,7 @@ */ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { /** Deployment cache by class name. */ - private final ConcurrentMap> cache = - new ConcurrentHashMap<>(); + private final ConcurrentMap> cache = new ConcurrentHashMap<>(); /** Mutex. */ private final Object mux = new Object(); @@ -110,7 +110,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { Collection deps = new ArrayList<>(); synchronized (mux) { - for (ConcurrentLinkedDeque8 depList : cache.values()) + for (Deque depList : cache.values()) for (GridDeployment d : depList) if (!deps.contains(d)) deps.add(d); @@ -122,7 +122,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { /** {@inheritDoc} */ @Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) { synchronized (mux) { - for (ConcurrentLinkedDeque8 deps : cache.values()) + for (Deque deps : cache.values()) for (GridDeployment dep : deps) if (dep.classLoaderId().equals(ldrId)) return dep; @@ -232,7 +232,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { * @return Deployment. */ @Nullable private GridDeployment deployment(String alias) { - ConcurrentLinkedDeque8 deps = cache.get(alias); + Deque deps = cache.get(alias); if (deps != null) { GridDeployment dep = deps.peekFirst(); @@ -260,10 +260,10 @@ private GridDeployment deploy(DeploymentMode depMode, ClassLoader ldr, Class boolean fireEvt = false; try { - ConcurrentLinkedDeque8 cachedDeps = null; + Deque cachedDeps = null; // Find existing class loader info. - for (ConcurrentLinkedDeque8 deps : cache.values()) { + for (Deque deps : cache.values()) { for (GridDeployment d : deps) { if (d.classLoader() == ldr) { // Cache class and alias. @@ -304,8 +304,11 @@ private GridDeployment deploy(DeploymentMode depMode, ClassLoader ldr, Class assert fireEvt : "Class was not added to newly created deployment [cls=" + cls + ", depMode=" + depMode + ", dep=" + dep + ']'; - ConcurrentLinkedDeque8 deps = - F.addIfAbsent(cache, alias, F.newDeque()); + Deque deps = F.>addIfAbsent( + cache, + alias, + ConcurrentLinkedDeque::new + ); if (!deps.isEmpty()) { for (GridDeployment d : deps) { @@ -512,8 +515,8 @@ private void undeploy(ClassLoader ldr) { Collection doomed = new HashSet<>(); synchronized (mux) { - for (Iterator> i1 = cache.values().iterator(); i1.hasNext();) { - ConcurrentLinkedDeque8 deps = i1.next(); + for (Iterator> i1 = cache.values().iterator(); i1.hasNext();) { + Deque deps = i1.next(); for (Iterator i2 = deps.iterator(); i2.hasNext();) { GridDeployment dep = i2.next(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index a1c1792a703f9..a9fa3c777a26b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -63,9 +65,8 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.util.deque.FastSizeDeque; import org.jetbrains.annotations.Nullable; -import java.util.concurrent.ConcurrentHashMap; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_NESTED_LISTENER_CALLS; import static org.apache.ignite.IgniteSystemProperties.getInteger; @@ -122,7 +123,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap near2dht = newMap(); /** Finish futures. */ - private final ConcurrentLinkedDeque8 finishFuts = new ConcurrentLinkedDeque8<>(); + private final FastSizeDeque finishFuts = new FastSizeDeque<>(new ConcurrentLinkedDeque<>()); /** Nested listener calls. */ private final ThreadLocal nestedLsnrCalls = new ThreadLocal() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java index c3dbf3f07f662..6e353be69340f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java @@ -17,7 +17,10 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Collection; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -26,8 +29,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; -import java.util.concurrent.ConcurrentHashMap; -import org.jsr166.ConcurrentLinkedDeque8; +import org.apache.ignite.util.deque.FastSizeDeque; /** * @@ -66,7 +68,7 @@ public GridDeferredAckMessageSender(GridTimeoutProcessor time, * @param nodeId Node ID. * @param vers Versions to send. */ - public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8 vers); + public abstract void finish(UUID nodeId, Collection vers); /** * @@ -116,7 +118,7 @@ private class DeferredAckMessageBuffer extends ReentrantReadWriteLock implements private AtomicBoolean guard = new AtomicBoolean(false); /** Versions. */ - private ConcurrentLinkedDeque8 vers = new ConcurrentLinkedDeque8<>(); + private FastSizeDeque vers = new FastSizeDeque<>(new ConcurrentLinkedDeque<>()); /** Node ID. */ private final UUID nodeId; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 077ccf8ccbcaa..89ce6ddb4f958 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -56,7 +57,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; +import org.apache.ignite.util.deque.FastSizeDeque; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL; @@ -139,7 +140,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** Remove queue. */ @GridToStringExclude - private final ConcurrentLinkedDeque8 rmvQueue = new ConcurrentLinkedDeque8<>(); + private final FastSizeDeque rmvQueue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>()); /** Group reservations. */ @GridToStringExclude diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index 7a7c04519238c..2fc69938942b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -24,12 +24,13 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; +import org.apache.ignite.util.deque.FastSizeDeque; /** * @@ -49,7 +50,7 @@ public class CacheContinuousQueryEventBuffer { private AtomicReference curBatch = new AtomicReference<>(); /** */ - private ConcurrentLinkedDeque8 backupQ = new ConcurrentLinkedDeque8<>(); + private FastSizeDeque backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>()); /** */ private ConcurrentSkipListMap pending = new ConcurrentSkipListMap<>(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java index d7a13e668a25e..44cadd6b0e12a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -48,8 +49,8 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.util.deque.FastSizeDeque; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import org.jsr166.ConcurrentLinkedHashMap; import static javax.cache.Cache.Entry; @@ -879,7 +880,7 @@ private void wakeUp() { */ private class Flusher extends GridWorker { /** Queue to flush. */ - private final ConcurrentLinkedDeque8>> queue; + private final FastSizeDeque>> queue; /** Flusher write map. */ private final ConcurrentHashMap> flusherWriteMap; @@ -917,7 +918,7 @@ protected Flusher(String igniteInstanceName, flusherWriteMap = null; } else { - queue = new ConcurrentLinkedDeque8<>(); + queue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>()); flusherWriteMap = new ConcurrentHashMap<>(initCap, 0.75f, concurLvl); } } @@ -937,8 +938,8 @@ protected void start() { */ private void putToFlusherWriteCache( K key, - StatefulValue newVal) - throws IgniteInterruptedCheckedException { + StatefulValue newVal + ) throws IgniteInterruptedCheckedException { assert !writeCoalescing : "Unexpected write coalescing."; if (queue.sizex() > flusherCacheCriticalSize) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index dde70bc4e982e..fbdeca10eb46f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -90,7 +90,6 @@ import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import org.jsr166.ConcurrentLinkedHashMap; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE; @@ -227,7 +226,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE; } - @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8 vers) { + @Override public void finish(UUID nodeId, Collection vers) { GridDhtTxOnePhaseCommitAckRequest ackReq = new GridDhtTxOnePhaseCommitAckRequest(vers); cctx.kernalContext().gateway().readLock(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java index 597eae84c41b6..265388fc05d58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java @@ -18,14 +18,15 @@ package org.apache.ignite.internal.processors.continuous; import java.util.Collection; -import org.jsr166.ConcurrentLinkedDeque8; +import java.util.concurrent.ConcurrentLinkedDeque; +import org.apache.ignite.util.deque.FastSizeDeque; /** * Continuous routine batch adapter. */ public class GridContinuousBatchAdapter implements GridContinuousBatch { /** Buffer. */ - protected final ConcurrentLinkedDeque8 buf = new ConcurrentLinkedDeque8<>(); + protected final FastSizeDeque buf = new FastSizeDeque<>(new ConcurrentLinkedDeque<>()); /** {@inheritDoc} */ @Override public void add(Object obj) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 25f3029228195..5693eed58910a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -28,6 +28,7 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -85,7 +86,6 @@ import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.resources.TaskContinuousMapperResource; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER; import static org.apache.ignite.compute.ComputeJobResultPolicy.WAIT; @@ -177,7 +177,7 @@ private enum State { private ComputeTask task; /** */ - private final Queue delayedRess = new ConcurrentLinkedDeque8<>(); + private final Queue delayedRess = new ConcurrentLinkedDeque<>(); /** */ private boolean continuous; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 3048f7d364133..ef32e3114d72e 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -58,7 +58,6 @@ import org.apache.ignite.internal.util.lang.gridfunc.CacheEntryGetValueClosure; import org.apache.ignite.internal.util.lang.gridfunc.CacheEntryHasPeekPredicate; import org.apache.ignite.internal.util.lang.gridfunc.ClusterNodeGetIdClosure; -import org.apache.ignite.internal.util.lang.gridfunc.ConcurrentDequeFactoryCallable; import org.apache.ignite.internal.util.lang.gridfunc.ConcurrentHashSetFactoryCallable; import org.apache.ignite.internal.util.lang.gridfunc.ConcurrentMapFactoryCallable; import org.apache.ignite.internal.util.lang.gridfunc.ContainsNodeIdsPredicate; @@ -104,7 +103,6 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteReducer; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; /** * Contains factory and utility methods for {@code closures}, {@code predicates}, and {@code tuples}. @@ -138,9 +136,6 @@ public class GridFunc { /** */ private static final IgnitePredicate ALWAYS_FALSE = new AlwaysFalsePredicate<>(); - /** */ - private static final IgniteCallable DEQUE_FACTORY = new ConcurrentDequeFactoryCallable(); - /** */ private static final IgnitePredicate IS_NOT_NULL = new IsNotNullPredicate(); @@ -1275,19 +1270,6 @@ public static boolean isNotEmptyDirectory(Path dir) { } } - /** - * Returns a factory closure that creates new {@link ConcurrentLinkedDeque8} instance. - * Note that this method does not create a new closure but returns a static one. - * - * @param Type parameters for the created {@link List}. - * @return Factory closure that creates new {@link List} instance every - * time its {@link org.apache.ignite.lang.IgniteOutClosure#apply()} method is called. - */ - @SuppressWarnings("unchecked") - public static IgniteCallable> newDeque() { - return (IgniteCallable>)DEQUE_FACTORY; - } - /** * Returns a factory closure that creates new {@link AtomicInteger} instance * initialized to {@code zero}. Note that this method does not create a new diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ConcurrentDequeFactoryCallable.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ConcurrentDequeFactoryCallable.java deleted file mode 100644 index 8af7fc864c5c9..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ConcurrentDequeFactoryCallable.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.lang.gridfunc; - -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteCallable; -import org.jsr166.ConcurrentLinkedDeque8; - -/** - * Deque factory. - */ -public class ConcurrentDequeFactoryCallable implements IgniteCallable { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public ConcurrentLinkedDeque8 call() { - return new ConcurrentLinkedDeque8(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ConcurrentDequeFactoryCallable.class, this); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 71a3b8db69802..d30b122cb2970 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteLogger; @@ -30,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; +import org.apache.ignite.util.deque.FastSizeDeque; /** * Session implementation bound to selector API and socket API. @@ -39,7 +40,7 @@ */ class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKeyAttachment { /** Pending write requests. */ - private final ConcurrentLinkedDeque8 queue = new ConcurrentLinkedDeque8<>(); + private final FastSizeDeque queue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>()); /** Selection key associated with this session. */ @GridToStringExclude diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java index 4ab361ee3d403..7cda691d45aa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java @@ -30,6 +30,7 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteLogger; @@ -56,7 +57,6 @@ import org.apache.ignite.spi.collision.CollisionExternalListener; import org.apache.ignite.spi.collision.CollisionJobContext; import org.apache.ignite.spi.collision.CollisionSpi; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; @@ -288,7 +288,7 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi private final ConcurrentMap rcvMsgMap = new ConcurrentHashMap<>(); /** */ - private final Queue nodeQueue = new ConcurrentLinkedDeque8<>(); + private final Queue nodeQueue = new ConcurrentLinkedDeque<>(); /** */ private CollisionExternalListener extLsnr; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 27f5e3ee61ee1..c5f366bfb8d26 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -42,6 +42,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -140,7 +141,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -1096,7 +1096,7 @@ class ConnectClosure implements IgniteInClosure { private ShmemAcceptWorker shmemAcceptWorker; /** Shared memory workers. */ - private final Collection shmemWorkers = new ConcurrentLinkedDeque8<>(); + private final Collection shmemWorkers = new ConcurrentLinkedDeque<>(); /** Clients. */ private final ConcurrentMap clients = GridConcurrentFactory.newMap(); diff --git a/modules/core/src/main/java/org/apache/ignite/util/deque/FastSizeDeque.java b/modules/core/src/main/java/org/apache/ignite/util/deque/FastSizeDeque.java new file mode 100644 index 0000000000000..0e07d7fe3a4ae --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/util/deque/FastSizeDeque.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util.deque; + +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Consumer; +import java.util.function.Predicate; +import org.jetbrains.annotations.NotNull; + +/** + * {@link java.util.Deque} decorator that uses {@link LongAdder} for faster size computation. + *

+ * Implementation is thread-safe if underlying {@link Deque} is thread-safe. + * + * @param Deque element type + */ +public class FastSizeDeque implements Deque { + + /** */ + private class Iter implements Iterator { + /** */ + private final Iterator iter; + + /** */ + private Iter(Iterator iter) { + this.iter = iter; + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return iter.hasNext(); + } + + /** {@inheritDoc} */ + @Override public E next() { + return iter.next(); + } + + /** {@inheritDoc} */ + @Override public void remove() { + iter.remove(); + + adder.decrement(); + } + + /** {@inheritDoc} */ + @Override public void forEachRemaining(Consumer consumer) { + iter.forEachRemaining(consumer); + } + } + + /** Underlying deque. */ + private final Deque deque; + + /** Size. */ + private final LongAdder adder = new LongAdder(); + + /** Creates a decorator. + * + * @param deque Deque being decorated. + */ + public FastSizeDeque(Deque deque) { + this.deque = Objects.requireNonNull(deque); + } + + /** + * Fast size getter. + * + * @return Deque size. + */ + public int sizex() { + return adder.intValue(); + } + + /** + * Tests this deque for emptiness.; equivalent to {@code sizex() == 0}. + * + * @return {@code True} if this deque is empty. + */ + public boolean isEmptyx() { + return adder.intValue() == 0; + } + + /** {@inheritDoc} */ + @Override public void addFirst(E e) { + deque.addFirst(e); + + adder.increment(); + } + + /** {@inheritDoc} */ + @Override public void addLast(E e) { + deque.addLast(e); + + adder.increment(); + } + + /** {@inheritDoc} */ + @Override public boolean offerFirst(E e) { + boolean res = deque.offerFirst(e); + + if (res) + adder.increment(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean offerLast(E e) { + boolean res = deque.offerLast(e); + + if (res) + adder.increment(); + + return res; + } + + /** {@inheritDoc} */ + @Override public E removeFirst() { + E res = deque.removeFirst(); + + adder.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Override public E removeLast() { + E res = deque.removeLast(); + + adder.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Override public E pollFirst() { + E res = deque.pollFirst(); + + if (res != null) + adder.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Override public E pollLast() { + E res = deque.pollFirst(); + + if (res != null) + adder.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Override public E getFirst() { + return deque.getFirst(); + } + + /** {@inheritDoc} */ + @Override public E getLast() { + return deque.getLast(); + } + + /** {@inheritDoc} */ + @Override public E peekFirst() { + return deque.peekFirst(); + } + + /** {@inheritDoc} */ + @Override public E peekLast() { + return deque.peekLast(); + } + + /** {@inheritDoc} */ + @Override public boolean removeFirstOccurrence(Object o) { + boolean res = deque.removeFirstOccurrence(o); + + if (res) + adder.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean removeLastOccurrence(Object o) { + boolean res = deque.removeLastOccurrence(o); + + if (res) + adder.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean add(E e) { + boolean alwaysTrue = deque.add(e); + + adder.increment(); + + return alwaysTrue; + } + + /** {@inheritDoc} */ + @Override public boolean offer(E e) { + boolean res = deque.offer(e); + + if (res) + adder.increment(); + + return res; + } + + /** {@inheritDoc} */ + @Override public E remove() { + E res = deque.remove(); + + adder.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Override public E poll() { + E res = deque.poll(); + + if (res != null) + adder.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Override public E element() { + return deque.element(); + } + + /** {@inheritDoc} */ + @Override public E peek() { + return deque.peek(); + } + + /** {@inheritDoc} */ + @Override public void push(E e) { + deque.push(e); + + adder.increment(); + } + + /** {@inheritDoc} */ + @Override public E pop() { + E res = deque.pop(); + + adder.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean remove(Object o) { + boolean res = deque.remove(o); + + if (res) + adder.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean containsAll(@NotNull Collection col) { + return deque.containsAll(col); + } + + /** + * Adds all of the elements in the specified collection at the end of this deque. + *

+ * Note: If collection being added can mutate concurrently, or underlying deque implementation allows partial + * insertion, then subsequent calls to {@link #size()} can report incorrect value. + * + * @param col The elements to be inserted into this deque. + * @return {@code true} if this deque changed as a result of the call. + */ + @Override public boolean addAll(@NotNull Collection col) { + int colSize = col.size(); + + boolean res = deque.addAll(col); + + if (res) + adder.add(colSize); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean removeAll(@NotNull Collection col) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean removeIf(Predicate pred) { + // Default implementation in Collection works through iterator, hence the adder is kept consistent. + // But Deque implementations can override default behavior, so we'd better to prohibit the operation. + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean retainAll(@NotNull Collection col) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void clear() { + while (pollFirst() != null) { + // No-op. + } + } + + /** {@inheritDoc} */ + @Override public boolean contains(Object o) { + return deque.contains(o); + } + + /** {@inheritDoc} */ + @Override public int size() { + return deque.size(); + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + return deque.isEmpty(); + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator iterator() { + return new Iter(deque.iterator()); + } + + /** {@inheritDoc} */ + @NotNull @Override public Object[] toArray() { + return deque.toArray(); + } + + /** {@inheritDoc} */ + @NotNull @Override public T[] toArray(@NotNull T[] ts) { + return deque.toArray(ts); + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator descendingIterator() { + return new Iter(deque.descendingIterator()); + } +} diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index cb708575ad374..1de3633fff6a2 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -25,7 +25,6 @@ org.apache.ignite.IgniteException org.apache.ignite.IgniteIllegalStateException org.apache.ignite.IgniteInterruptedException org.apache.ignite.IgniteState -org.apache.ignite.IgniteSystemProperties$1 org.apache.ignite.binary.BinaryInvalidTypeException org.apache.ignite.binary.BinaryObject org.apache.ignite.binary.BinaryObjectException @@ -67,8 +66,10 @@ org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicy$DefaultHolderCompar org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicy$GridConcurrentSkipListSetEx org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicy$HolderComparator org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicyFactory +org.apache.ignite.cache.query.AbstractContinuousQuery org.apache.ignite.cache.query.CacheQueryEntryEvent org.apache.ignite.cache.query.ContinuousQuery +org.apache.ignite.cache.query.ContinuousQueryWithTransformer org.apache.ignite.cache.query.Query org.apache.ignite.cache.query.QueryCancelledException org.apache.ignite.cache.query.ScanQuery @@ -241,6 +242,7 @@ org.apache.ignite.internal.IgniteInterruptedCheckedException org.apache.ignite.internal.IgniteKernal org.apache.ignite.internal.IgniteKernal$1 org.apache.ignite.internal.IgniteKernal$5 +org.apache.ignite.internal.IgniteKernal$6 org.apache.ignite.internal.IgniteMessagingImpl org.apache.ignite.internal.IgniteNeedReconnectException org.apache.ignite.internal.IgniteSchedulerImpl @@ -313,9 +315,9 @@ org.apache.ignite.internal.executor.GridExecutorService$1 org.apache.ignite.internal.executor.GridExecutorService$TaskTerminateListener org.apache.ignite.internal.igfs.common.IgfsIpcCommand org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl -org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$1 org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$BooleanProperty org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$ConnectionProperty +org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$EmptyStringValidator org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$IntegerProperty org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$NumberProperty org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl$PropertyValidator @@ -370,6 +372,8 @@ org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion org.apache.ignite.internal.processors.affinity.GridAffinityAssignment org.apache.ignite.internal.processors.affinity.GridAffinityMessage org.apache.ignite.internal.processors.affinity.GridAffinityUtils$AffinityJob +org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter +org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager$1 org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager$10 @@ -608,6 +612,11 @@ org.apache.ignite.internal.processors.cache.KeyCacheObject org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl org.apache.ignite.internal.processors.cache.QueryCursorImpl$State org.apache.ignite.internal.processors.cache.StoredCacheData +org.apache.ignite.internal.processors.cache.WalStateAbstractMessage +org.apache.ignite.internal.processors.cache.WalStateAckMessage +org.apache.ignite.internal.processors.cache.WalStateFinishMessage +org.apache.ignite.internal.processors.cache.WalStateManager$2 +org.apache.ignite.internal.processors.cache.WalStateProposeMessage org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy org.apache.ignite.internal.processors.cache.binary.BinaryMetadataHolder org.apache.ignite.internal.processors.cache.binary.BinaryMetadataKey @@ -917,15 +926,19 @@ org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache$8 org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache$9 org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter$RowData org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager$11 -org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager$7 +org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager$6 +org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager$8 org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager$CheckpointEntryType org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$RebalanceIteratorAdapter org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager$1 org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory org.apache.ignite.internal.processors.cache.persistence.file.FileDownloader$1 org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory +org.apache.ignite.internal.processors.cache.persistence.file.PersistentStorageIOException org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl$Segment +org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl$ThrottlingPolicy +org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteSpeedBasedThrottle$ThrottleMode org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Bool @@ -939,6 +952,10 @@ org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogMan org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager$FileArchiver$2 org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager$FileCompressor$1 org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager$RecordsIterator +org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager$FileArchiver$1 +org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager$FileArchiver$2 +org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager$FileCompressor$1 +org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager$RecordsIterator org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer$BufferMode org.apache.ignite.internal.processors.cache.persistence.wal.SingleSegmentLogicalRecordsIterator @@ -959,9 +976,9 @@ org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter$ScanQuer org.apache.ignite.internal.processors.cache.query.GridCacheQueryDetailMetricsAdapter org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter$1 org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter$2 -org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$10 -org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$4$1 -org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$4$2 +org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$3$1 +org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$3$2 +org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$4 org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$5 org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$6 org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$7 @@ -997,13 +1014,16 @@ org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQuer org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEvent org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler -org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$1$1 +org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$1 +org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$2$1 org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$ContinuousQueryAsyncClosure$1 org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV2 +org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV3 org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$1 org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$2 org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$3 org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$4 +org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$5 org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$CacheEntryEventImpl org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$JCacheQuery$1 org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$JCacheQueryRemoteFilter @@ -1400,13 +1420,11 @@ org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils$Interna org.apache.ignite.internal.processors.platform.websession.PlatformDotNetSessionLockProcessor org.apache.ignite.internal.processors.platform.websession.PlatformDotNetSessionSetAndUnlockProcessor org.apache.ignite.internal.processors.query.GridQueryFieldMetadata -org.apache.ignite.internal.processors.query.GridQueryProcessor$10 org.apache.ignite.internal.processors.query.GridQueryProcessor$4 org.apache.ignite.internal.processors.query.GridQueryProcessor$5 org.apache.ignite.internal.processors.query.GridQueryProcessor$6 org.apache.ignite.internal.processors.query.GridQueryProcessor$7 org.apache.ignite.internal.processors.query.GridQueryProcessor$8 -org.apache.ignite.internal.processors.query.GridQueryProcessor$9 org.apache.ignite.internal.processors.query.GridQueryProcessor$SchemaOperation$1 org.apache.ignite.internal.processors.query.IgniteSQLException org.apache.ignite.internal.processors.query.QueryEntityEx @@ -1625,7 +1643,6 @@ org.apache.ignite.internal.util.IgniteUtils$7 org.apache.ignite.internal.util.IgniteUtils$8 org.apache.ignite.internal.util.IgniteUtils$9 org.apache.ignite.internal.util.StripedCompositeReadWriteLock$ReadLock -org.apache.ignite.internal.util.StripedExecutor$StealingStripe$1 org.apache.ignite.internal.util.UUIDCollectionMessage org.apache.ignite.internal.util.future.AsyncFutureListener org.apache.ignite.internal.util.future.GridCompoundFuture$1 @@ -1699,7 +1716,6 @@ org.apache.ignite.internal.util.lang.gridfunc.AtomicIntegerFactoryCallable org.apache.ignite.internal.util.lang.gridfunc.CacheEntryGetValueClosure org.apache.ignite.internal.util.lang.gridfunc.CacheEntryHasPeekPredicate org.apache.ignite.internal.util.lang.gridfunc.ClusterNodeGetIdClosure -org.apache.ignite.internal.util.lang.gridfunc.ConcurrentDequeFactoryCallable org.apache.ignite.internal.util.lang.gridfunc.ConcurrentHashSetFactoryCallable org.apache.ignite.internal.util.lang.gridfunc.ConcurrentMapFactoryCallable org.apache.ignite.internal.util.lang.gridfunc.ContainsNodeIdsPredicate @@ -2167,6 +2183,7 @@ org.apache.ignite.startup.cmdline.AboutDialog$1 org.apache.ignite.stream.StreamReceiver org.apache.ignite.stream.StreamTransformer org.apache.ignite.stream.StreamTransformer$1 +org.apache.ignite.stream.StreamTransformer$EntryProcessorWrapper org.apache.ignite.stream.StreamVisitor org.apache.ignite.stream.StreamVisitor$1 org.apache.ignite.transactions.TransactionConcurrency diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java index f2e4f32257a2c..deca296f59033 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -39,7 +40,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -54,7 +54,7 @@ public class GridCacheReplicatedSynchronousCommitTest extends GridCommonAbstract private static final String NO_COMMIT = "no_commit"; /** */ - private final Collection commSpis = new ConcurrentLinkedDeque8<>(); + private final Collection commSpis = new ConcurrentLinkedDeque<>(); /** */ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridCompoundFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridCompoundFutureSelfTest.java index 17ef2c58e6406..a6021332e7e9e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridCompoundFutureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridCompoundFutureSelfTest.java @@ -20,11 +20,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.ConcurrentLinkedDeque; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jsr166.ConcurrentLinkedDeque8; /** * Tests compound future contracts. @@ -120,8 +120,7 @@ public void testCompleteOnException() throws Exception { public void testConcurrentCompletion() throws Exception { GridCompoundFuture fut = new GridCompoundFuture<>(CU.boolReducer()); - final ConcurrentLinkedDeque8> futs = - new ConcurrentLinkedDeque8<>(); + final ConcurrentLinkedDeque> futs = new ConcurrentLinkedDeque<>(); for (int i = 0; i < 1000; i++) { GridFutureAdapter part = new GridFutureAdapter<>(); @@ -153,8 +152,7 @@ public void testConcurrentCompletion() throws Exception { public void testConcurrentRandomCompletion() throws Exception { GridCompoundFuture fut = new GridCompoundFuture<>(CU.boolReducer()); - final ConcurrentLinkedDeque8> futs = - new ConcurrentLinkedDeque8<>(); + final ConcurrentLinkedDeque> futs = new ConcurrentLinkedDeque<>(); for (int i = 0; i < 1000; i++) { GridFutureAdapter part = new GridFutureAdapter<>(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureListenPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureListenPerformanceTest.java index 8528a085866c2..42d18386c73a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureListenPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureListenPerformanceTest.java @@ -20,13 +20,13 @@ import java.util.Date; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.LongAdder; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.lang.IgniteInClosure; -import org.jsr166.ConcurrentLinkedDeque8; /** * @@ -42,7 +42,7 @@ public class GridFutureListenPerformanceTest { public static void main(String[] args) throws InterruptedException { final LongAdder cnt = new LongAdder(); - final ConcurrentLinkedDeque8> futs = new ConcurrentLinkedDeque8<>(); + final ConcurrentLinkedDeque> futs = new ConcurrentLinkedDeque<>(); ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureQueueTest.java index 94d981b7419cd..4b6d81c2da114 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureQueueTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureQueueTest.java @@ -18,9 +18,10 @@ package org.apache.ignite.internal.util.future; import java.util.ArrayList; +import java.util.Deque; import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicLong; -import org.jsr166.ConcurrentLinkedDeque8; /** * Performance tests added to compare the same functionality in .Net. @@ -75,7 +76,7 @@ private static class QueueTest { private AtomicLong qSize = new AtomicLong(); /** */ - private final ConcurrentLinkedDeque8 queue = new ConcurrentLinkedDeque8<>(); + private final Deque queue = new ConcurrentLinkedDeque<>(); /** */ private volatile boolean stop; diff --git a/modules/core/src/test/java/org/apache/ignite/lang/GridFutureListenPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/lang/GridFutureListenPerformanceTest.java index d2f29b6514fd3..f542011b69bb0 100644 --- a/modules/core/src/test/java/org/apache/ignite/lang/GridFutureListenPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/lang/GridFutureListenPerformanceTest.java @@ -20,6 +20,7 @@ import java.util.Date; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -27,7 +28,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.jsr166.ConcurrentLinkedDeque8; /** * @@ -43,7 +43,7 @@ public class GridFutureListenPerformanceTest { public static void main(String[] args) throws InterruptedException { final LongAdder cnt = new LongAdder(); - final ConcurrentLinkedDeque8> futs = new ConcurrentLinkedDeque8<>(); + final ConcurrentLinkedDeque> futs = new ConcurrentLinkedDeque<>(); ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferPerformanceTest.java index 26b160a0fcdf7..c0fc4864472b9 100644 --- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferPerformanceTest.java @@ -19,13 +19,14 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import org.apache.ignite.internal.util.GridCircularBuffer; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jsr166.ConcurrentLinkedDeque8; +import org.apache.ignite.util.deque.FastSizeDeque; /** * @@ -75,7 +76,7 @@ public void testThroughput() throws Exception { */ public void testDequeueThroughput() throws Exception { - final ConcurrentLinkedDeque8 buf = new ConcurrentLinkedDeque8<>(); + final FastSizeDeque buf = new FastSizeDeque<>(new ConcurrentLinkedDeque<>()); final LongAdder cnt = new LongAdder(); final AtomicBoolean finished = new AtomicBoolean(); diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java index a6aa10a45432d..50d351b30cd28 100644 --- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java @@ -17,11 +17,12 @@ package org.apache.ignite.lang.utils; +import java.util.Deque; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.internal.util.GridCircularBuffer; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jsr166.ConcurrentLinkedDeque8; /** * @@ -143,8 +144,8 @@ public void testMutliThreaded2() throws Exception { info("Created buffer: " + buf); final int iterCnt = 10_000; - final ConcurrentLinkedDeque8 evictedQ = new ConcurrentLinkedDeque8<>(); - final ConcurrentLinkedDeque8 putQ = new ConcurrentLinkedDeque8<>(); + final Deque evictedQ = new ConcurrentLinkedDeque<>(); + final Deque putQ = new ConcurrentLinkedDeque<>(); multithreaded( new Callable() { @@ -168,7 +169,7 @@ public void testMutliThreaded2() throws Exception { evictedQ.addAll(buf.items()); assert putQ.containsAll(evictedQ); - assert evictedQ.sizex() == putQ.sizex(); + assert evictedQ.size() == putQ.size(); info("Buffer: " + buf); } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiLanTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiLanTest.java index 868408e57bfb9..bb57d70a097c6 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiLanTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiLanTest.java @@ -18,8 +18,10 @@ package org.apache.ignite.spi.communication.tcp; import java.util.Collections; +import java.util.Deque; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.management.MBeanServer; @@ -39,7 +41,6 @@ import org.apache.ignite.testframework.config.GridTestProperties; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; -import org.jsr166.ConcurrentLinkedDeque8; /** * Class for multithreaded {@link TcpCommunicationSpi} test. @@ -91,8 +92,7 @@ private class MessageListener implements CommunicationListener { private final UUID locNodeId; /** Received messages by node. */ - private ConcurrentLinkedDeque8 rcvdMsgs = - new ConcurrentLinkedDeque8<>(); + private Deque rcvdMsgs = new ConcurrentLinkedDeque<>(); /** Count of messages received from remote nodes */ private AtomicInteger rmtMsgCnt = new AtomicInteger(); @@ -129,13 +129,6 @@ private class MessageListener implements CommunicationListener { // No-op. } - /** - * @return Queue containing received messages in receive order. - */ - public ConcurrentLinkedDeque8 receivedMessages() { - return rcvdMsgs; - } - /** * @return Count of messages received from remote node. */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index bc07028fc6a3f..d610bc3c04d53 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,6 +28,7 @@ import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,7 +61,6 @@ import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; @@ -131,7 +132,7 @@ private static class MessageListener implements CommunicationListener { private final UUID locNodeId; /** Received messages by node. */ - private ConcurrentLinkedDeque8 rcvdMsgs = new ConcurrentLinkedDeque8<>(); + private Deque rcvdMsgs = new ConcurrentLinkedDeque<>(); /** Count of messages received from remote nodes */ private AtomicInteger rmtMsgCnt = new AtomicInteger(); @@ -173,7 +174,7 @@ private static class MessageListener implements CommunicationListener { /** * @return Queue containing received messages in receive order. */ - public ConcurrentLinkedDeque8 receivedMsgs() { + public Deque receivedMsgs() { return rcvdMsgs; } @@ -186,7 +187,7 @@ public int remoteMessageCount() { /** {@inheritDoc} */ @Override public String toString() { - return "MessageListener [nodeId=" + locNodeId + ", rcvd=" + rcvdMsgs.sizex() + ']'; + return "MessageListener [nodeId=" + locNodeId + ", rcvd=" + rcvdMsgs.size() + ']'; } } @@ -200,7 +201,7 @@ public void testSendToRandomNodesMultithreaded() throws Exception { assertEquals("Invalid listener count", getSpiCount(), lsnrs.size()); - final ConcurrentMap> msgs = new ConcurrentHashMap<>(); + final ConcurrentMap> msgs = new ConcurrentHashMap<>(); final int iterationCnt = 5000; @@ -221,11 +222,10 @@ public void testSendToRandomNodesMultithreaded() throws Exception { spis.get(from.id()).sendMessage(to, msg); - ConcurrentLinkedDeque8 queue = msgs.get(to.id()); + Deque queue = msgs.get(to.id()); if (queue == null) { - ConcurrentLinkedDeque8 old = msgs.putIfAbsent(to.id(), - queue = new ConcurrentLinkedDeque8<>()); + Deque old = msgs.putIfAbsent(to.id(), queue = new ConcurrentLinkedDeque<>()); if (old != null) queue = old; @@ -251,25 +251,25 @@ public void testSendToRandomNodesMultithreaded() throws Exception { U.sleep(IDLE_CONN_TIMEOUT * 2); // Now validate all sent and received messages. - for (Entry> e : msgs.entrySet()) { + for (Entry> e : msgs.entrySet()) { UUID to = e.getKey(); - ConcurrentLinkedDeque8 sent = e.getValue(); + Deque sent = e.getValue(); MessageListener lsnr = lsnrs.get(to); - ConcurrentLinkedDeque8 rcvd = lsnr.receivedMsgs(); + Deque rcvd = lsnr.receivedMsgs(); info(">>> Node " + to + " received " + lsnr.remoteMessageCount() + " remote messages of " + - rcvd.sizex() + " total"); + rcvd.size() + " total"); - for (int i = 0; i < 3 && sent.sizex() != rcvd.sizex(); i++) { - info("Check failed for node [node=" + to + ", sent=" + sent.sizex() + ", rcvd=" + rcvd.sizex() + ']'); + for (int i = 0; i < 3 && sent.size() != rcvd.size(); i++) { + info("Check failed for node [node=" + to + ", sent=" + sent.size() + ", rcvd=" + rcvd.size() + ']'); U.sleep(2000); } - assertEquals("Sent and received messages count mismatch.", sent.sizex(), rcvd.sizex()); + assertEquals("Sent and received messages count mismatch.", sent.size(), rcvd.size()); assertTrue("Listener did not receive some messages: " + lsnr, rcvd.containsAll(sent)); assertTrue("Listener received extra messages: " + lsnr, sent.containsAll(rcvd)); diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java index 9dfea18260f26..be8a174a6fbb8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantLock; @@ -58,7 +59,6 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.spi.IgnitePortProtocol; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.CRASHED; import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.FAILED; @@ -760,7 +760,7 @@ private static class HadoopProcess extends ReentrantLock { private Collection reducers; /** Tasks. */ - private final Collection tasks = new ConcurrentLinkedDeque8<>(); + private final Collection tasks = new ConcurrentLinkedDeque<>(); /** Terminated flag. */ private volatile boolean terminated; diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java index a241a041dd57d..c5a4f77d03bae 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java @@ -29,6 +29,7 @@ import java.nio.channels.SocketChannel; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -68,7 +69,6 @@ import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; /** * Hadoop external communication class. @@ -229,7 +229,7 @@ public class HadoopExternalCommunication { private ShmemAcceptWorker shmemAcceptWorker; /** Shared memory workers. */ - private final Collection shmemWorkers = new ConcurrentLinkedDeque8<>(); + private final Collection shmemWorkers = new ConcurrentLinkedDeque<>(); /** Clients. */ private final ConcurrentMap clients = GridConcurrentFactory.newMap();