Skip to content

Commit

Permalink
IGNITE-2545 : Allocate ArrayList for futures only after second future…
Browse files Browse the repository at this point in the history
… is added. This closes #929.
  • Loading branch information
vozerov-gridgain committed Aug 10, 2016
1 parent ae04efb commit e5ae9fc
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 50 deletions.
Expand Up @@ -426,10 +426,10 @@ public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) {
@SuppressWarnings("ForLoopReplaceableByForEach") @SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) { private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy. // We iterate directly over the futs collection here to avoid copy.
synchronized (futs) { synchronized (sync) {
// Avoid iterator creation. // Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) { for (int i = 0; i < futuresCount(); i++) {
IgniteInternalFuture<Boolean> fut = futs.get(i); IgniteInternalFuture<Boolean> fut = future(i);


if (!isMini(fut)) if (!isMini(fut))
continue; continue;
Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheContext;
Expand All @@ -50,7 +49,6 @@
import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -277,7 +275,7 @@ else if (mappedKeys != null)
// Optimization to avoid going through compound future, // Optimization to avoid going through compound future,
// if getAsync() has been completed and no other futures added to this // if getAsync() has been completed and no other futures added to this
// compound future. // compound future.
if (fut.isDone() && futuresSize() == 0) { if (fut.isDone() && futuresCount() == 0) {
if (fut.error() != null) if (fut.error() != null)
onDone(fut.error()); onDone(fut.error());
else else
Expand Down
Expand Up @@ -546,10 +546,10 @@ void onResult(UUID nodeId, GridDhtLockResponse res) {
@SuppressWarnings("ForLoopReplaceableByForEach") @SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) { private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy. // We iterate directly over the futs collection here to avoid copy.
synchronized (futs) { synchronized (sync) {
// Avoid iterator creation. // Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) { for (int i = 0; i < futuresCount(); i++) {
MiniFuture mini = (MiniFuture)futs.get(i); MiniFuture mini = (MiniFuture) future(i);


if (mini.futureId().equals(miniId)) { if (mini.futureId().equals(miniId)) {
if (!mini.isDone()) if (!mini.isDone())
Expand Down
Expand Up @@ -526,10 +526,10 @@ public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) {
@SuppressWarnings("ForLoopReplaceableByForEach") @SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) { private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy. // We iterate directly over the futs collection here to avoid copy.
synchronized (futs) { synchronized (sync) {
// Avoid iterator creation. // Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) { for (int i = 0; i < futuresCount(); i++) {
IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i); IgniteInternalFuture<IgniteInternalTx> fut = future(i);


if (!isMini(fut)) if (!isMini(fut))
continue; continue;
Expand Down
Expand Up @@ -471,10 +471,10 @@ public Set<KeyCacheObject> requestedKeys() {
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"}) @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
private MiniFuture miniFuture(IgniteUuid miniId) { private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy. // We iterate directly over the futs collection here to avoid copy.
synchronized (futs) { synchronized (sync) {
// Avoid iterator creation. // Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) { for (int i = 0; i < futuresCount(); i++) {
IgniteInternalFuture<Boolean> fut = futs.get(i); IgniteInternalFuture<Boolean> fut = future(i);


if (!isMini(fut)) if (!isMini(fut))
continue; continue;
Expand Down
Expand Up @@ -489,10 +489,10 @@ else if (log.isDebugEnabled())
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"}) @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
private MiniFuture miniFuture(IgniteUuid miniId) { private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy. // We iterate directly over the futs collection here to avoid copy.
synchronized (futs) { synchronized (sync) {
// Avoid iterator creation. // Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) { for (int i = 0; i < futuresCount(); i++) {
IgniteInternalFuture<Boolean> fut = futs.get(i); IgniteInternalFuture<Boolean> fut = future(i);


if (!isMini(fut)) if (!isMini(fut))
continue; continue;
Expand Down
Expand Up @@ -229,10 +229,10 @@ private void onError(@Nullable GridDistributedTxMapping m, Throwable e) {
@SuppressWarnings("ForLoopReplaceableByForEach") @SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) { private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy. // We iterate directly over the futs collection here to avoid copy.
synchronized (futs) { synchronized (sync) {
// Avoid iterator creation. // Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) { for (int i = 0; i < futuresCount(); i++) {
IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i); IgniteInternalFuture<GridNearTxPrepareResponse> fut = future(i);


if (!isMini(fut)) if (!isMini(fut))
continue; continue;
Expand Down
Expand Up @@ -190,10 +190,10 @@ void onError(Throwable e, boolean discoThread) {
@SuppressWarnings("ForLoopReplaceableByForEach") @SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) { private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy. // We iterate directly over the futs collection here to avoid copy.
synchronized (futs) { synchronized (sync) {
// Avoid iterator creation. // Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) { for (int i = 0; i < futuresCount(); i++) {
IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i); IgniteInternalFuture<GridNearTxPrepareResponse> fut = future(i);


if (!isMini(fut)) if (!isMini(fut))
continue; continue;
Expand Down
Expand Up @@ -132,10 +132,10 @@ public GridNearPessimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearT
@SuppressWarnings("ForLoopReplaceableByForEach") @SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) { private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy. // We iterate directly over the futs collection here to avoid copy.
synchronized (futs) { synchronized (sync) {
// Avoid iterator creation. // Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) { for (int i = 0; i < futuresCount(); i++) {
MiniFuture mini = (MiniFuture)futs.get(i); MiniFuture mini = (MiniFuture) future(i);


if (mini.futureId().equals(miniId)) { if (mini.futureId().equals(miniId)) {
if (!mini.isDone()) if (!mini.isDone())
Expand Down
Expand Up @@ -195,9 +195,9 @@ public void onResult(UUID nodeId, GridNearTxFinishResponse res) {
if (!isDone()) { if (!isDone()) {
FinishMiniFuture finishFut = null; FinishMiniFuture finishFut = null;


synchronized (futs) { synchronized (sync) {
for (int i = 0; i < futs.size(); i++) { for (int i = 0; i < futuresCount(); i++) {
IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i); IgniteInternalFuture<IgniteInternalTx> fut = future(i);


if (fut.getClass() == FinishMiniFuture.class) { if (fut.getClass() == FinishMiniFuture.class) {
FinishMiniFuture f = (FinishMiniFuture)fut; FinishMiniFuture f = (FinishMiniFuture)fut;
Expand Down
Expand Up @@ -33,6 +33,8 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;


/** /**
Expand All @@ -53,8 +55,11 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
private static final AtomicIntegerFieldUpdater<GridCompoundFuture> LSNR_CALLS_UPD = private static final AtomicIntegerFieldUpdater<GridCompoundFuture> LSNR_CALLS_UPD =
AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls"); AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");


/** Futures. */ /** Sync object */
protected final ArrayList<IgniteInternalFuture<T>> futs = new ArrayList<>(); protected final Object sync = new Object();

/** Possible values: null (no future), IgniteInternalFuture instance (single future) or List of futures */
private volatile Object futs;


/** Reducer. */ /** Reducer. */
@GridToStringInclude @GridToStringInclude
Expand Down Expand Up @@ -155,8 +160,14 @@ public GridCompoundFuture(@Nullable IgniteReducer<T, R> rdc) {
* @return Collection of futures. * @return Collection of futures.
*/ */
public Collection<IgniteInternalFuture<T>> futures() { public Collection<IgniteInternalFuture<T>> futures() {
synchronized (futs) { synchronized (sync) {
return new ArrayList<>(futs); if(futs == null)
return Collections.emptyList();

if (futs instanceof IgniteInternalFuture)
return Collections.singletonList((IgniteInternalFuture<T>)futs);

return new ArrayList<>((Collection<IgniteInternalFuture<T>>)futs);
} }
} }


Expand All @@ -179,10 +190,10 @@ protected boolean ignoreFailure(Throwable err) {
*/ */
@SuppressWarnings("ForLoopReplaceableByForEach") @SuppressWarnings("ForLoopReplaceableByForEach")
public boolean hasPending() { public boolean hasPending() {
synchronized (futs) { synchronized (sync) {
// Avoid iterator creation and collection copy. // Avoid iterator creation and collection copy.
for (int i = 0; i < futs.size(); i++) { for (int i = 0; i < futuresCount(); i++) {
IgniteInternalFuture<T> fut = futs.get(i); IgniteInternalFuture<T> fut = future(i);


if (!fut.isDone()) if (!fut.isDone())
return true; return true;
Expand All @@ -197,11 +208,23 @@ public boolean hasPending() {
* *
* @param fut Future to add. * @param fut Future to add.
*/ */
@SuppressWarnings("unchecked")
public void add(IgniteInternalFuture<T> fut) { public void add(IgniteInternalFuture<T> fut) {
assert fut != null; assert fut != null;


synchronized (futs) { synchronized (sync) {
futs.add(fut); if (futs == null)
futs = fut;
else if (futs instanceof IgniteInternalFuture) {
Collection<IgniteInternalFuture> futs0 = new ArrayList<>(4);

futs0.add((IgniteInternalFuture)futs);
futs0.add(fut);

futs = futs0;
}
else
((Collection<IgniteInternalFuture>)futs).add(fut);
} }


fut.listen(this); fut.listen(this);
Expand All @@ -217,8 +240,8 @@ public void add(IgniteInternalFuture<T> fut) {
} }


/** /**
* @return {@code True} if this future was initialized. Initialization happens when * @return {@code True} if this future was initialized. Initialization happens when {@link #markInitialized()}
* {@link #markInitialized()} method is called on future. * method is called on future.
*/ */
public boolean initialized() { public boolean initialized() {
return initFlag == INIT_FLAG; return initFlag == INIT_FLAG;
Expand All @@ -236,7 +259,7 @@ public void markInitialized() {
* Check completeness of the future. * Check completeness of the future.
*/ */
private void checkComplete() { private void checkComplete() {
if (initialized() && !isDone() && lsnrCalls == futuresSize()) { if (initialized() && !isDone() && lsnrCalls == futuresCount()) {
try { try {
onDone(rdc != null ? rdc.reduce() : null); onDone(rdc != null ? rdc.reduce() : null);
} }
Expand All @@ -255,12 +278,39 @@ private void checkComplete() {
} }
} }


/**
* Returns future at the specified position in this list.
*
* @param idx - index index of the element to return
* @return Future.
*/
@SuppressWarnings("unchecked")
protected IgniteInternalFuture<T> future(int idx) {
assert Thread.holdsLock(sync);
assert futs != null && idx >= 0 && idx < futuresCount();

if (futs instanceof IgniteInternalFuture) {
assert idx == 0;

return (IgniteInternalFuture<T>)futs;
}
else
return ((List<IgniteInternalFuture>)futs).get(idx);
}

/** /**
* @return Futures size. * @return Futures size.
*/ */
protected int futuresSize() { @SuppressWarnings("unchecked")
synchronized (futs) { protected int futuresCount() {
return futs.size(); synchronized (sync) {
if (futs == null)
return 0;

if (futs instanceof IgniteInternalFuture)
return 1;

return ((Collection<IgniteInternalFuture>)futs).size();
} }
} }


Expand All @@ -271,11 +321,11 @@ protected int futuresSize() {
"cancelled", isCancelled(), "cancelled", isCancelled(),
"err", error(), "err", error(),
"futs", "futs",
F.viewReadOnly(futures(), new C1<IgniteInternalFuture<T>, String>() { F.viewReadOnly(futures(), new C1<IgniteInternalFuture<T>, String>() {
@Override public String apply(IgniteInternalFuture<T> f) { @Override public String apply(IgniteInternalFuture<T> f) {
return Boolean.toString(f.isDone()); return Boolean.toString(f.isDone());
} }
}) })
); );
} }
} }

0 comments on commit e5ae9fc

Please sign in to comment.