From e5ae9fcad5f75dd0bc2225132da926e8985d18e0 Mon Sep 17 00:00:00 2001 From: vozerov-gridgain Date: Wed, 10 Aug 2016 12:37:34 +0300 Subject: [PATCH] IGNITE-2545 : Allocate ArrayList for futures only after second future is added. This closes #929. --- .../GridCacheTxRecoveryFuture.java | 6 +- .../distributed/dht/GridDhtGetFuture.java | 4 +- .../distributed/dht/GridDhtLockFuture.java | 6 +- .../dht/GridDhtTxPrepareFuture.java | 6 +- .../colocated/GridDhtColocatedLockFuture.java | 6 +- .../distributed/near/GridNearLockFuture.java | 6 +- ...OptimisticSerializableTxPrepareFuture.java | 6 +- .../GridNearOptimisticTxPrepareFuture.java | 6 +- .../GridNearPessimisticTxPrepareFuture.java | 6 +- .../near/GridNearTxFinishFuture.java | 6 +- .../util/future/GridCompoundFuture.java | 90 ++++++++++++++----- 11 files changed, 98 insertions(+), 50 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index 731238677168d..c07a817001c75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -426,10 +426,10 @@ public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) { @SuppressWarnings("ForLoopReplaceableByForEach") private MiniFuture miniFuture(IgniteUuid miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (futs) { + synchronized (sync) { // Avoid iterator creation. - for (int i = 0; i < futs.size(); i++) { - IgniteInternalFuture fut = futs.get(i); + for (int i = 0; i < futuresCount(); i++) { + IgniteInternalFuture fut = future(i); if (!isMini(fut)) continue; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 913580f8c02ad..d2a3b3c022ddc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -29,7 +29,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -50,7 +49,6 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -277,7 +275,7 @@ else if (mappedKeys != null) // Optimization to avoid going through compound future, // if getAsync() has been completed and no other futures added to this // compound future. - if (fut.isDone() && futuresSize() == 0) { + if (fut.isDone() && futuresCount() == 0) { if (fut.error() != null) onDone(fut.error()); else diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 64b874577fb56..187c8a4bf84e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -546,10 +546,10 @@ void onResult(UUID nodeId, GridDhtLockResponse res) { @SuppressWarnings("ForLoopReplaceableByForEach") private MiniFuture miniFuture(IgniteUuid miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (futs) { + synchronized (sync) { // Avoid iterator creation. - for (int i = 0; i < futs.size(); i++) { - MiniFuture mini = (MiniFuture)futs.get(i); + for (int i = 0; i < futuresCount(); i++) { + MiniFuture mini = (MiniFuture) future(i); if (mini.futureId().equals(miniId)) { if (!mini.isDone()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index e9805aa033abf..280089771cd2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -526,10 +526,10 @@ public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) { @SuppressWarnings("ForLoopReplaceableByForEach") private MiniFuture miniFuture(IgniteUuid miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (futs) { + synchronized (sync) { // Avoid iterator creation. - for (int i = 0; i < futs.size(); i++) { - IgniteInternalFuture fut = futs.get(i); + for (int i = 0; i < futuresCount(); i++) { + IgniteInternalFuture fut = future(i); if (!isMini(fut)) continue; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index f77efeef9bac0..05b4a2bade2d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -471,10 +471,10 @@ public Set requestedKeys() { @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"}) private MiniFuture miniFuture(IgniteUuid miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (futs) { + synchronized (sync) { // Avoid iterator creation. - for (int i = 0; i < futs.size(); i++) { - IgniteInternalFuture fut = futs.get(i); + for (int i = 0; i < futuresCount(); i++) { + IgniteInternalFuture fut = future(i); if (!isMini(fut)) continue; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 4b6448bd2c9ad..3b53c5e269fe8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -489,10 +489,10 @@ else if (log.isDebugEnabled()) @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"}) private MiniFuture miniFuture(IgniteUuid miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (futs) { + synchronized (sync) { // Avoid iterator creation. - for (int i = 0; i < futs.size(); i++) { - IgniteInternalFuture fut = futs.get(i); + for (int i = 0; i < futuresCount(); i++) { + IgniteInternalFuture fut = future(i); if (!isMini(fut)) continue; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 6515140a1a958..cd0ce4460532f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -229,10 +229,10 @@ private void onError(@Nullable GridDistributedTxMapping m, Throwable e) { @SuppressWarnings("ForLoopReplaceableByForEach") private MiniFuture miniFuture(IgniteUuid miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (futs) { + synchronized (sync) { // Avoid iterator creation. - for (int i = 0; i < futs.size(); i++) { - IgniteInternalFuture fut = futs.get(i); + for (int i = 0; i < futuresCount(); i++) { + IgniteInternalFuture fut = future(i); if (!isMini(fut)) continue; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 1ea99c466c85e..7a49422eee1b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -190,10 +190,10 @@ void onError(Throwable e, boolean discoThread) { @SuppressWarnings("ForLoopReplaceableByForEach") private MiniFuture miniFuture(IgniteUuid miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (futs) { + synchronized (sync) { // Avoid iterator creation. - for (int i = 0; i < futs.size(); i++) { - IgniteInternalFuture fut = futs.get(i); + for (int i = 0; i < futuresCount(); i++) { + IgniteInternalFuture fut = future(i); if (!isMini(fut)) continue; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 5d347d7c5f23d..a353ed55cb555 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -132,10 +132,10 @@ public GridNearPessimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearT @SuppressWarnings("ForLoopReplaceableByForEach") private MiniFuture miniFuture(IgniteUuid miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (futs) { + synchronized (sync) { // Avoid iterator creation. - for (int i = 0; i < futs.size(); i++) { - MiniFuture mini = (MiniFuture)futs.get(i); + for (int i = 0; i < futuresCount(); i++) { + MiniFuture mini = (MiniFuture) future(i); if (mini.futureId().equals(miniId)) { if (!mini.isDone()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 39f3ff3e06712..c96651c2b6e99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -195,9 +195,9 @@ public void onResult(UUID nodeId, GridNearTxFinishResponse res) { if (!isDone()) { FinishMiniFuture finishFut = null; - synchronized (futs) { - for (int i = 0; i < futs.size(); i++) { - IgniteInternalFuture fut = futs.get(i); + synchronized (sync) { + for (int i = 0; i < futuresCount(); i++) { + IgniteInternalFuture fut = future(i); if (fut.getClass() == FinishMiniFuture.class) { FinishMiniFuture f = (FinishMiniFuture)fut; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 3409341df61d7..b83133a204cd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -33,6 +33,8 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** @@ -53,8 +55,11 @@ public class GridCompoundFuture extends GridFutureAdapter implements Ig private static final AtomicIntegerFieldUpdater LSNR_CALLS_UPD = AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls"); - /** Futures. */ - protected final ArrayList> futs = new ArrayList<>(); + /** Sync object */ + protected final Object sync = new Object(); + + /** Possible values: null (no future), IgniteInternalFuture instance (single future) or List of futures */ + private volatile Object futs; /** Reducer. */ @GridToStringInclude @@ -155,8 +160,14 @@ public GridCompoundFuture(@Nullable IgniteReducer rdc) { * @return Collection of futures. */ public Collection> futures() { - synchronized (futs) { - return new ArrayList<>(futs); + synchronized (sync) { + if(futs == null) + return Collections.emptyList(); + + if (futs instanceof IgniteInternalFuture) + return Collections.singletonList((IgniteInternalFuture)futs); + + return new ArrayList<>((Collection>)futs); } } @@ -179,10 +190,10 @@ protected boolean ignoreFailure(Throwable err) { */ @SuppressWarnings("ForLoopReplaceableByForEach") public boolean hasPending() { - synchronized (futs) { + synchronized (sync) { // Avoid iterator creation and collection copy. - for (int i = 0; i < futs.size(); i++) { - IgniteInternalFuture fut = futs.get(i); + for (int i = 0; i < futuresCount(); i++) { + IgniteInternalFuture fut = future(i); if (!fut.isDone()) return true; @@ -197,11 +208,23 @@ public boolean hasPending() { * * @param fut Future to add. */ + @SuppressWarnings("unchecked") public void add(IgniteInternalFuture fut) { assert fut != null; - synchronized (futs) { - futs.add(fut); + synchronized (sync) { + if (futs == null) + futs = fut; + else if (futs instanceof IgniteInternalFuture) { + Collection futs0 = new ArrayList<>(4); + + futs0.add((IgniteInternalFuture)futs); + futs0.add(fut); + + futs = futs0; + } + else + ((Collection)futs).add(fut); } fut.listen(this); @@ -217,8 +240,8 @@ public void add(IgniteInternalFuture fut) { } /** - * @return {@code True} if this future was initialized. Initialization happens when - * {@link #markInitialized()} method is called on future. + * @return {@code True} if this future was initialized. Initialization happens when {@link #markInitialized()} + * method is called on future. */ public boolean initialized() { return initFlag == INIT_FLAG; @@ -236,7 +259,7 @@ public void markInitialized() { * Check completeness of the future. */ private void checkComplete() { - if (initialized() && !isDone() && lsnrCalls == futuresSize()) { + if (initialized() && !isDone() && lsnrCalls == futuresCount()) { try { onDone(rdc != null ? rdc.reduce() : null); } @@ -255,12 +278,39 @@ private void checkComplete() { } } + /** + * Returns future at the specified position in this list. + * + * @param idx - index index of the element to return + * @return Future. + */ + @SuppressWarnings("unchecked") + protected IgniteInternalFuture future(int idx) { + assert Thread.holdsLock(sync); + assert futs != null && idx >= 0 && idx < futuresCount(); + + if (futs instanceof IgniteInternalFuture) { + assert idx == 0; + + return (IgniteInternalFuture)futs; + } + else + return ((List)futs).get(idx); + } + /** * @return Futures size. */ - protected int futuresSize() { - synchronized (futs) { - return futs.size(); + @SuppressWarnings("unchecked") + protected int futuresCount() { + synchronized (sync) { + if (futs == null) + return 0; + + if (futs instanceof IgniteInternalFuture) + return 1; + + return ((Collection)futs).size(); } } @@ -271,11 +321,11 @@ protected int futuresSize() { "cancelled", isCancelled(), "err", error(), "futs", - F.viewReadOnly(futures(), new C1, String>() { - @Override public String apply(IgniteInternalFuture f) { - return Boolean.toString(f.isDone()); - } - }) + F.viewReadOnly(futures(), new C1, String>() { + @Override public String apply(IgniteInternalFuture f) { + return Boolean.toString(f.isDone()); + } + }) ); } }