Skip to content

Commit

Permalink
ignite-7517 Almost all usages of ConcurrentLinkedDeque8 class are rem…
Browse files Browse the repository at this point in the history
…oved

Signed-off-by: Andrey Gura <agura@apache.org>
  • Loading branch information
andrey-kuznetsov authored and agura committed Mar 5, 2018
1 parent 18c6b4a commit 768063c
Show file tree
Hide file tree
Showing 29 changed files with 506 additions and 169 deletions.
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -36,8 +38,6 @@
import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import java.util.concurrent.ConcurrentHashMap;
import org.jsr166.ConcurrentLinkedDeque8;


/** /**
* IGFS client logger writing data to the file. * IGFS client logger writing data to the file.
Expand Down Expand Up @@ -234,7 +234,7 @@ private IgfsLogger(String endpoint, String igfsName, String dir, int batchSize)


file = new File(dirFile, "igfs-log-" + igfsName + "-" + pid + ".csv"); file = new File(dirFile, "igfs-log-" + igfsName + "-" + pid + ".csv");


entries = new ConcurrentLinkedDeque8<>(); entries = new ConcurrentLinkedDeque<>();


cnt = new AtomicInteger(); cnt = new AtomicInteger();
useCnt = new AtomicInteger(); useCnt = new AtomicInteger();
Expand Down Expand Up @@ -727,7 +727,7 @@ private void flush() {
try { try {
entries0 = entries; entries0 = entries;


entries = new ConcurrentLinkedDeque8<>(); entries = new ConcurrentLinkedDeque<>();
} }
finally { finally {
rwLock.writeLock().unlock(); rwLock.writeLock().unlock();
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -96,7 +97,6 @@
import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;


import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
Expand Down Expand Up @@ -164,8 +164,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private final UUID locNodeId; private final UUID locNodeId;


/** Cache for messages that were received prior to discovery. */ /** Cache for messages that were received prior to discovery. */
private final ConcurrentMap<UUID, ConcurrentLinkedDeque8<DelayedMessage>> waitMap = private final ConcurrentMap<UUID, Deque<DelayedMessage>> waitMap = new ConcurrentHashMap<>();
new ConcurrentHashMap<>();


/** Communication message listener. */ /** Communication message listener. */
private CommunicationListener<Serializable> commLsnr; private CommunicationListener<Serializable> commLsnr;
Expand Down Expand Up @@ -774,7 +773,7 @@ private void format(StringBuilder b, Collection<IgnitePair<Long>> pairs, SimpleD
lock.writeLock().lock(); lock.writeLock().lock();


try { try {
ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId); Deque<DelayedMessage> waitList = waitMap.remove(nodeId);


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Removed messages from discovery startup delay list " + log.debug("Removed messages from discovery startup delay list " +
Expand Down Expand Up @@ -804,9 +803,9 @@ private void format(StringBuilder b, Collection<IgnitePair<Long>> pairs, SimpleD
try { try {
started = true; started = true;


for (Entry<UUID, ConcurrentLinkedDeque8<DelayedMessage>> e : waitMap.entrySet()) { for (Entry<UUID, Deque<DelayedMessage>> e : waitMap.entrySet()) {
if (ctx.discovery().node(e.getKey()) != null) { if (ctx.discovery().node(e.getKey()) != null) {
ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(e.getKey()); Deque<DelayedMessage> waitList = waitMap.remove(e.getKey());


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Processing messages from discovery startup delay list: " + waitList); log.debug("Processing messages from discovery startup delay list: " + waitList);
Expand Down Expand Up @@ -954,8 +953,11 @@ private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
log.debug("Adding message to waiting list [senderId=" + nodeId + log.debug("Adding message to waiting list [senderId=" + nodeId +
", msg=" + msg + ']'); ", msg=" + msg + ']');


ConcurrentLinkedDeque8<DelayedMessage> list = Deque<DelayedMessage> list = F.<UUID, Deque<DelayedMessage>>addIfAbsent(
F.addIfAbsent(waitMap, nodeId, F.<DelayedMessage>newDeque()); waitMap,
nodeId,
ConcurrentLinkedDeque::new
);


assert list != null; assert list != null;


Expand Down
Expand Up @@ -19,13 +19,15 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.compute.ComputeTask;
Expand All @@ -46,7 +48,6 @@
import org.apache.ignite.spi.deployment.DeploymentResource; import org.apache.ignite.spi.deployment.DeploymentResource;
import org.apache.ignite.spi.deployment.DeploymentSpi; import org.apache.ignite.spi.deployment.DeploymentSpi;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;


import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOYED; import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOYED;
import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOY_FAILED; import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOY_FAILED;
Expand All @@ -60,8 +61,7 @@
*/ */
class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
/** Deployment cache by class name. */ /** Deployment cache by class name. */
private final ConcurrentMap<String, ConcurrentLinkedDeque8<GridDeployment>> cache = private final ConcurrentMap<String, Deque<GridDeployment>> cache = new ConcurrentHashMap<>();
new ConcurrentHashMap<>();


/** Mutex. */ /** Mutex. */
private final Object mux = new Object(); private final Object mux = new Object();
Expand Down Expand Up @@ -110,7 +110,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
Collection<GridDeployment> deps = new ArrayList<>(); Collection<GridDeployment> deps = new ArrayList<>();


synchronized (mux) { synchronized (mux) {
for (ConcurrentLinkedDeque8<GridDeployment> depList : cache.values()) for (Deque<GridDeployment> depList : cache.values())
for (GridDeployment d : depList) for (GridDeployment d : depList)
if (!deps.contains(d)) if (!deps.contains(d))
deps.add(d); deps.add(d);
Expand All @@ -122,7 +122,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) { @Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) {
synchronized (mux) { synchronized (mux) {
for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values()) for (Deque<GridDeployment> deps : cache.values())
for (GridDeployment dep : deps) for (GridDeployment dep : deps)
if (dep.classLoaderId().equals(ldrId)) if (dep.classLoaderId().equals(ldrId))
return dep; return dep;
Expand Down Expand Up @@ -232,7 +232,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
* @return Deployment. * @return Deployment.
*/ */
@Nullable private GridDeployment deployment(String alias) { @Nullable private GridDeployment deployment(String alias) {
ConcurrentLinkedDeque8<GridDeployment> deps = cache.get(alias); Deque<GridDeployment> deps = cache.get(alias);


if (deps != null) { if (deps != null) {
GridDeployment dep = deps.peekFirst(); GridDeployment dep = deps.peekFirst();
Expand Down Expand Up @@ -260,10 +260,10 @@ private GridDeployment deploy(DeploymentMode depMode, ClassLoader ldr, Class<?>
boolean fireEvt = false; boolean fireEvt = false;


try { try {
ConcurrentLinkedDeque8<GridDeployment> cachedDeps = null; Deque<GridDeployment> cachedDeps = null;


// Find existing class loader info. // Find existing class loader info.
for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values()) { for (Deque<GridDeployment> deps : cache.values()) {
for (GridDeployment d : deps) { for (GridDeployment d : deps) {
if (d.classLoader() == ldr) { if (d.classLoader() == ldr) {
// Cache class and alias. // Cache class and alias.
Expand Down Expand Up @@ -304,8 +304,11 @@ private GridDeployment deploy(DeploymentMode depMode, ClassLoader ldr, Class<?>
assert fireEvt : "Class was not added to newly created deployment [cls=" + cls + assert fireEvt : "Class was not added to newly created deployment [cls=" + cls +
", depMode=" + depMode + ", dep=" + dep + ']'; ", depMode=" + depMode + ", dep=" + dep + ']';


ConcurrentLinkedDeque8<GridDeployment> deps = Deque<GridDeployment> deps = F.<String, Deque<GridDeployment>>addIfAbsent(
F.addIfAbsent(cache, alias, F.<GridDeployment>newDeque()); cache,
alias,
ConcurrentLinkedDeque::new
);


if (!deps.isEmpty()) { if (!deps.isEmpty()) {
for (GridDeployment d : deps) { for (GridDeployment d : deps) {
Expand Down Expand Up @@ -512,8 +515,8 @@ private void undeploy(ClassLoader ldr) {
Collection<GridDeployment> doomed = new HashSet<>(); Collection<GridDeployment> doomed = new HashSet<>();


synchronized (mux) { synchronized (mux) {
for (Iterator<ConcurrentLinkedDeque8<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) { for (Iterator<Deque<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) {
ConcurrentLinkedDeque8<GridDeployment> deps = i1.next(); Deque<GridDeployment> deps = i1.next();


for (Iterator<GridDeployment> i2 = deps.iterator(); i2.hasNext();) { for (Iterator<GridDeployment> i2 = deps.iterator(); i2.hasNext();) {
GridDeployment dep = i2.next(); GridDeployment dep = i2.next();
Expand Down
Expand Up @@ -27,6 +27,8 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -63,9 +65,8 @@
import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import java.util.concurrent.ConcurrentHashMap;
import org.jsr166.ConcurrentLinkedDeque8;


import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_NESTED_LISTENER_CALLS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_NESTED_LISTENER_CALLS;
import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.IgniteSystemProperties.getInteger;
Expand Down Expand Up @@ -122,7 +123,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap(); private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();


/** Finish futures. */ /** Finish futures. */
private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>(); private final FastSizeDeque<FinishLockFuture> finishFuts = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());


/** Nested listener calls. */ /** Nested listener calls. */
private final ThreadLocal<Integer> nestedLsnrCalls = new ThreadLocal<Integer>() { private final ThreadLocal<Integer> nestedLsnrCalls = new ThreadLocal<Integer>() {
Expand Down
Expand Up @@ -17,7 +17,10 @@


package org.apache.ignite.internal.processors.cache; package org.apache.ignite.internal.processors.cache;


import java.util.Collection;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -26,8 +29,7 @@
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lang.IgniteUuid;
import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.util.deque.FastSizeDeque;
import org.jsr166.ConcurrentLinkedDeque8;


/** /**
* *
Expand Down Expand Up @@ -66,7 +68,7 @@ public GridDeferredAckMessageSender(GridTimeoutProcessor time,
* @param nodeId Node ID. * @param nodeId Node ID.
* @param vers Versions to send. * @param vers Versions to send.
*/ */
public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<T> vers); public abstract void finish(UUID nodeId, Collection<T> vers);


/** /**
* *
Expand Down Expand Up @@ -116,7 +118,7 @@ private class DeferredAckMessageBuffer extends ReentrantReadWriteLock implements
private AtomicBoolean guard = new AtomicBoolean(false); private AtomicBoolean guard = new AtomicBoolean(false);


/** Versions. */ /** Versions. */
private ConcurrentLinkedDeque8<T> vers = new ConcurrentLinkedDeque8<>(); private FastSizeDeque<T> vers = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());


/** Node ID. */ /** Node ID. */
private final UUID nodeId; private final UUID nodeId;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -56,7 +57,7 @@
import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8; import org.apache.ignite.util.deque.FastSizeDeque;


import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
Expand Down Expand Up @@ -139,7 +140,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements


/** Remove queue. */ /** Remove queue. */
@GridToStringExclude @GridToStringExclude
private final ConcurrentLinkedDeque8<RemovedEntryHolder> rmvQueue = new ConcurrentLinkedDeque8<>(); private final FastSizeDeque<RemovedEntryHolder> rmvQueue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());


/** Group reservations. */ /** Group reservations. */
@GridToStringExclude @GridToStringExclude
Expand Down
Expand Up @@ -24,12 +24,13 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8; import org.apache.ignite.util.deque.FastSizeDeque;


/** /**
* *
Expand All @@ -49,7 +50,7 @@ public class CacheContinuousQueryEventBuffer {
private AtomicReference<Batch> curBatch = new AtomicReference<>(); private AtomicReference<Batch> curBatch = new AtomicReference<>();


/** */ /** */
private ConcurrentLinkedDeque8<CacheContinuousQueryEntry> backupQ = new ConcurrentLinkedDeque8<>(); private FastSizeDeque<CacheContinuousQueryEntry> backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());


/** */ /** */
private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>(); private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -48,8 +49,8 @@
import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.ConcurrentLinkedHashMap; import org.jsr166.ConcurrentLinkedHashMap;


import static javax.cache.Cache.Entry; import static javax.cache.Cache.Entry;
Expand Down Expand Up @@ -879,7 +880,7 @@ private void wakeUp() {
*/ */
private class Flusher extends GridWorker { private class Flusher extends GridWorker {
/** Queue to flush. */ /** Queue to flush. */
private final ConcurrentLinkedDeque8<IgniteBiTuple<K, StatefulValue<K,V>>> queue; private final FastSizeDeque<IgniteBiTuple<K, StatefulValue<K,V>>> queue;


/** Flusher write map. */ /** Flusher write map. */
private final ConcurrentHashMap<K, StatefulValue<K,V>> flusherWriteMap; private final ConcurrentHashMap<K, StatefulValue<K,V>> flusherWriteMap;
Expand Down Expand Up @@ -917,7 +918,7 @@ protected Flusher(String igniteInstanceName,
flusherWriteMap = null; flusherWriteMap = null;
} }
else { else {
queue = new ConcurrentLinkedDeque8<>(); queue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
flusherWriteMap = new ConcurrentHashMap<>(initCap, 0.75f, concurLvl); flusherWriteMap = new ConcurrentHashMap<>(initCap, 0.75f, concurLvl);
} }
} }
Expand All @@ -937,8 +938,8 @@ protected void start() {
*/ */
private void putToFlusherWriteCache( private void putToFlusherWriteCache(
K key, K key,
StatefulValue<K, V> newVal) StatefulValue<K, V> newVal
throws IgniteInterruptedCheckedException { ) throws IgniteInterruptedCheckedException {
assert !writeCoalescing : "Unexpected write coalescing."; assert !writeCoalescing : "Unexpected write coalescing.";


if (queue.sizex() > flusherCacheCriticalSize) { if (queue.sizex() > flusherCacheCriticalSize) {
Expand Down
Expand Up @@ -90,7 +90,6 @@
import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState; import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.ConcurrentLinkedHashMap; import org.jsr166.ConcurrentLinkedHashMap;


import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
Expand Down Expand Up @@ -227,7 +226,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE; return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
} }


@Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) { @Override public void finish(UUID nodeId, Collection<GridCacheVersion> vers) {
GridDhtTxOnePhaseCommitAckRequest ackReq = new GridDhtTxOnePhaseCommitAckRequest(vers); GridDhtTxOnePhaseCommitAckRequest ackReq = new GridDhtTxOnePhaseCommitAckRequest(vers);


cctx.kernalContext().gateway().readLock(); cctx.kernalContext().gateway().readLock();
Expand Down
Expand Up @@ -18,14 +18,15 @@
package org.apache.ignite.internal.processors.continuous; package org.apache.ignite.internal.processors.continuous;


import java.util.Collection; import java.util.Collection;
import org.jsr166.ConcurrentLinkedDeque8; import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.util.deque.FastSizeDeque;


/** /**
* Continuous routine batch adapter. * Continuous routine batch adapter.
*/ */
public class GridContinuousBatchAdapter implements GridContinuousBatch { public class GridContinuousBatchAdapter implements GridContinuousBatch {
/** Buffer. */ /** Buffer. */
protected final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>(); protected final FastSizeDeque<Object> buf = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void add(Object obj) { @Override public void add(Object obj) {
Expand Down

0 comments on commit 768063c

Please sign in to comment.