Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed May 7, 2015
1 parent 93876df commit a238ce3
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 156 deletions.
Expand Up @@ -510,7 +510,7 @@ else if (log.isDebugEnabled())
* @return Future. * @return Future.
*/ */
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
@Nullable public <T> GridCacheFuture<T> future(GridCacheVersion ver, IgniteUuid futId) { @Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) {
Collection<? extends GridCacheFuture> futs = this.futs.get(ver); Collection<? extends GridCacheFuture> futs = this.futs.get(ver);


if (futs != null) if (futs != null)
Expand All @@ -519,7 +519,7 @@ else if (log.isDebugEnabled())
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Found future in futures map: " + fut); log.debug("Found future in futures map: " + fut);


return (GridCacheFuture<T>)fut; return fut;
} }


if (log.isDebugEnabled()) if (log.isDebugEnabled())
Expand Down
Expand Up @@ -284,7 +284,7 @@ public void nearFinishMiniId(IgniteUuid nearFinMiniId) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() { @Override public IgniteInternalFuture<?> prepareAsync() {
if (optimistic()) { if (optimistic()) {
assert isSystemInvalidate(); assert isSystemInvalidate();


Expand All @@ -296,7 +296,6 @@ public void nearFinishMiniId(IgniteUuid nearFinMiniId) {
nearMiniId, nearMiniId,
null, null,
true, true,
null,
null); null);
} }


Expand All @@ -305,14 +304,13 @@ public void nearFinishMiniId(IgniteUuid nearFinMiniId) {


if (fut == null) { if (fut == null) {
// Future must be created before any exception can be thrown. // Future must be created before any exception can be thrown.
if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>( if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture(
cctx, cctx,
this, this,
nearMiniId, nearMiniId,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
true, true,
needReturnValue(), needReturnValue(),
null,
null))) null)))
return prepFut.get(); return prepFut.get();
} }
Expand Down Expand Up @@ -371,16 +369,15 @@ public void nearFinishMiniId(IgniteUuid nearFinMiniId) {
* @param lastBackups IDs of backup nodes receiving last prepare request. * @param lastBackups IDs of backup nodes receiving last prepare request.
* @return Future that will be completed when locks are acquired. * @return Future that will be completed when locks are acquired.
*/ */
public IgniteInternalFuture<IgniteInternalTx> prepareAsync( public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
@Nullable Iterable<IgniteTxEntry> reads, @Nullable Iterable<IgniteTxEntry> reads,
@Nullable Iterable<IgniteTxEntry> writes, @Nullable Iterable<IgniteTxEntry> writes,
Map<IgniteTxKey, GridCacheVersion> verMap, Map<IgniteTxKey, GridCacheVersion> verMap,
long msgId, long msgId,
IgniteUuid nearMiniId, IgniteUuid nearMiniId,
Map<UUID, Collection<UUID>> txNodes, Map<UUID, Collection<UUID>> txNodes,
boolean last, boolean last,
Collection<UUID> lastBackups, Collection<UUID> lastBackups
IgniteInClosure<GridNearTxPrepareResponse> completeCb
) { ) {
// In optimistic mode prepare still can be called explicitly from salvageTx. // In optimistic mode prepare still can be called explicitly from salvageTx.
GridDhtTxPrepareFuture fut = prepFut.get(); GridDhtTxPrepareFuture fut = prepFut.get();
Expand All @@ -389,29 +386,28 @@ public IgniteInternalFuture<IgniteInternalTx> prepareAsync(
init(); init();


// Future must be created before any exception can be thrown. // Future must be created before any exception can be thrown.
if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>( if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture(
cctx, cctx,
this, this,
nearMiniId, nearMiniId,
verMap, verMap,
last, last,
needReturnValue(), needReturnValue(),
lastBackups, lastBackups))) {
completeCb))) {
GridDhtTxPrepareFuture f = prepFut.get(); GridDhtTxPrepareFuture f = prepFut.get();


assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
"[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';


return f; return chainOnePhasePrepare(f);
} }
} }
else { else {
assert fut.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + assert fut.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
"[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']'; "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']';


// Prepare was called explicitly. // Prepare was called explicitly.
return fut; return chainOnePhasePrepare(fut);
} }


if (state() != PREPARING) { if (state() != PREPARING) {
Expand Down Expand Up @@ -475,7 +471,7 @@ public IgniteInternalFuture<IgniteInternalTx> prepareAsync(
} }
} }


return fut; return chainOnePhasePrepare(fut);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -517,8 +513,8 @@ public IgniteInternalFuture<IgniteInternalTx> prepareAsync(
} }
} }
else else
prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { prep.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { @Override public void apply(IgniteInternalFuture<?> f) {
try { try {
f.get(); // Check for errors of a parent future. f.get(); // Check for errors of a parent future.


Expand Down Expand Up @@ -605,8 +601,8 @@ public IgniteInternalFuture<IgniteInternalTx> prepareAsync(
else { else {
prepFut.complete(); prepFut.complete();


prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { @Override public void apply(IgniteInternalFuture<?> f) {
try { try {
f.get(); // Check for errors of a parent future. f.get(); // Check for errors of a parent future.
} }
Expand Down Expand Up @@ -686,7 +682,7 @@ public IgniteInternalFuture<IgniteInternalTx> prepareAsync(


/** {@inheritDoc} */ /** {@inheritDoc} */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() { @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
return prepFut.get(); return prepFut.get();
} }


Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.*;
Expand Down Expand Up @@ -885,6 +886,32 @@ private IgniteInternalFuture<GridCacheReturn> obtainLockAsync(
*/ */
protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut); protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut);


/**
* @return {@code True} if transaction if finished on prepare step.
*/
protected final boolean commitOnPrepare() {
return onePhaseCommit() && !near();
}

/**
* @param prepFut Prepare future.
* @return If transaction if finished on prepare step returns future which is completed after transaction finish.
*/
protected final IgniteInternalFuture<GridNearTxPrepareResponse> chainOnePhasePrepare(
final GridDhtTxPrepareFuture prepFut) {
if (commitOnPrepare()) {
return finishFuture().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridNearTxPrepareResponse>() {
@Override public GridNearTxPrepareResponse applyx(IgniteInternalFuture<IgniteInternalTx> finishFut)
throws IgniteCheckedException
{
return prepFut.get();
}
});
}

return prepFut;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void rollback() throws IgniteCheckedException { @Override public void rollback() throws IgniteCheckedException {
try { try {
Expand Down

0 comments on commit a238ce3

Please sign in to comment.