Skip to content

Commit

Permalink
Ignite-9655-merge - Fixing tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Goncharuk committed Feb 14, 2015
1 parent bcd76e1 commit 109dbe7
Show file tree
Hide file tree
Showing 14 changed files with 96 additions and 104 deletions.
Expand Up @@ -3988,6 +3988,7 @@ public IgniteInternalFuture<?> loadAll(
*/ */
private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException { private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException {
try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
ldr.allowOverwrite(true);
ldr.skipStore(true); ldr.skipStore(true);


final Collection<Map.Entry<K, V>> col = new ArrayList<>(ldr.perNodeBufferSize()); final Collection<Map.Entry<K, V>> col = new ArrayList<>(ldr.perNodeBufferSize());
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.marshaller.*; import org.apache.ignite.marshaller.*;
import org.jetbrains.annotations.*; import org.jetbrains.annotations.*;
Expand All @@ -40,6 +41,7 @@
/** /**
* Shared context. * Shared context.
*/ */
@GridToStringExclude
public class GridCacheSharedContext<K, V> { public class GridCacheSharedContext<K, V> {
/** Kernal context. */ /** Kernal context. */
private GridKernalContext kernalCtx; private GridKernalContext kernalCtx;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.transactions.*;
import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.tostring.*;
Expand Down Expand Up @@ -657,12 +658,14 @@ else if (op == READ) {
} }
} }
catch (Throwable ex) { catch (Throwable ex) {
uncommit();

state(UNKNOWN); state(UNKNOWN);


// In case of error, we still make the best effort to commit, // In case of error, we still make the best effort to commit,
// as there is no way to rollback at this point. // as there is no way to rollback at this point.
err = ex instanceof IgniteCheckedException ? (IgniteCheckedException)ex : err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
new IgniteCheckedException("Commit produced a runtime exception: " + this, ex); "(all transaction entries will be invalidated): " + CU.txString(this), ex);
} }
} }
} }
Expand Down Expand Up @@ -733,7 +736,7 @@ public void forceCommit() throws IgniteCheckedException {
try { try {
// Note that we don't evict near entries here - // Note that we don't evict near entries here -
// they will be deleted by their corresponding transactions. // they will be deleted by their corresponding transactions.
if (state(ROLLING_BACK)) { if (state(ROLLING_BACK) || state() == UNKNOWN) {
cctx.tm().rollbackTx(this); cctx.tm().rollbackTx(this);


state(ROLLED_BACK); state(ROLLED_BACK);
Expand Down
Expand Up @@ -302,7 +302,7 @@ private void onEntriesLocked() {
V val = cached.innerGet( V val = cached.innerGet(
tx, tx,
/*swap*/true, /*swap*/true,
/*read through*/retVal || hasFilters, /*read through*/(retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue(),
/*fail fast*/false, /*fail fast*/false,
/*unmarshal*/true, /*unmarshal*/true,
/*metrics*/retVal, /*metrics*/retVal,
Expand Down Expand Up @@ -381,39 +381,7 @@ private void onEntriesLocked() {
* @param t Error. * @param t Error.
*/ */
public void onError(Throwable t) { public void onError(Throwable t) {
if (err.compareAndSet(null, t)) { onDone(tx, t);
tx.setRollbackOnly();

// TODO: GG-4005:
// TODO: as an improvement, at some point we must rollback right away.
// TODO: However, in this case need to make sure that reply is sent back
// TODO: even for non-existing transactions whenever finish request comes in.
// try {
// tx.rollback();
// }
// catch (IgniteCheckedException ex) {
// U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
// }
//
try {
// Send reply back to near node.
GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>(
tx.nearXidVersion(),
tx.nearFutureId(),
nearMiniId,
tx.xidVersion(),
Collections.<Integer>emptySet(),
ret,
t);

sendPrepareResponse(res);
}
catch (IgniteCheckedException ignore) {
tx.rollbackAsync();
}

onComplete();
}
} }


/** /**
Expand Down Expand Up @@ -609,22 +577,26 @@ private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) throws Ign
*/ */
private GridNearTxPrepareResponse<K, V> createPrepareResponse() { private GridNearTxPrepareResponse<K, V> createPrepareResponse() {
// Send reply back to originating near node. // Send reply back to originating near node.
Throwable prepErr = err.get();

GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>( GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>(
tx.nearXidVersion(), tx.nearXidVersion(),
tx.colocated() ? tx.xid() : tx.nearFutureId(), tx.colocated() ? tx.xid() : tx.nearFutureId(),
nearMiniId == null ? tx.xid() : nearMiniId, nearMiniId == null ? tx.xid() : nearMiniId,
tx.xidVersion(), tx.xidVersion(),
tx.invalidPartitions(), tx.invalidPartitions(),
ret, ret,
err.get()); prepErr);


addDhtValues(res); if (prepErr == null) {
addDhtValues(res);


GridCacheVersion min = tx.minVersion(); GridCacheVersion min = tx.minVersion();


res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min)); res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min));


res.pending(localDhtPendingVersions(tx.writeEntries(), min)); res.pending(localDhtPendingVersions(tx.writeEntries(), min));
}


res.filterFailedKeys(filterFailedKeys); res.filterFailedKeys(filterFailedKeys);


Expand Down
Expand Up @@ -134,13 +134,15 @@ public GridNearTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridNear
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Transaction future received owner changed callback: " + entry); log.debug("Transaction future received owner changed callback: " + entry);


if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) { if (tx.optimistic()) {
lockKeys.remove(entry.txKey()); if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) {
lockKeys.remove(entry.txKey());


// This will check for locks. // This will check for locks.
onDone(); onDone();


return true; return true;
}
} }


return false; return false;
Expand Down
Expand Up @@ -437,6 +437,28 @@ protected GridCacheStoreManager<K, V> store() {
return null; return null;
} }


/**
* Uncommits transaction by invalidating all of its entries. Courtesy to minimize inconsistency.
*/
@SuppressWarnings({"CatchGenericClass"})
protected void uncommit() {
for (IgniteTxEntry<K, V> e : writeMap().values()) {
try {
GridCacheEntryEx<K, V> Entry = e.cached();

if (e.op() != NOOP)
Entry.invalidate(null, xidVer);
}
catch (Throwable t) {
U.error(log, "Failed to invalidate transaction entries while reverting a commit.", t);

break;
}
}

cctx.tm().uncommitTx(this);
}

/** /**
* This method uses unchecked assignment to cast group lock key entry to transaction generic signature. * This method uses unchecked assignment to cast group lock key entry to transaction generic signature.
* *
Expand Down Expand Up @@ -1269,7 +1291,7 @@ public String resolveTaskName() {
* @param newVer New version. * @param newVer New version.
* @param old Old entry. * @param old Old entry.
* @return Tuple with adjusted operation type and conflict context. * @return Tuple with adjusted operation type and conflict context.
* @throws org.apache.ignite.IgniteCheckedException In case of eny exception. * @throws IgniteCheckedException In case of eny exception.
* @throws GridCacheEntryRemovedException If entry got removed. * @throws GridCacheEntryRemovedException If entry got removed.
*/ */
protected IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>> conflictResolve( protected IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>> conflictResolve(
Expand Down
Expand Up @@ -25,12 +25,9 @@
import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.transactions.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
Expand Down Expand Up @@ -700,6 +697,10 @@ else if (e instanceof IgniteTxOptimisticCheckedException) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Optimistic failure for remote transaction (will rollback): " + req); log.debug("Optimistic failure for remote transaction (will rollback): " + req);
} }
else if (e instanceof IgniteTxHeuristicCheckedException) {
U.warn(log, "Failed to commit transaction (all transaction entries were invalidated): " +
CU.txString(dhtTx));
}
else else
U.error(log, "Failed to process prepare request: " + req, e); U.error(log, "Failed to process prepare request: " + req, e);


Expand Down Expand Up @@ -838,7 +839,7 @@ else if (log.isDebugEnabled())
protected void finish( protected void finish(
UUID nodeId, UUID nodeId,
GridDistributedTxRemoteAdapter<K, V> tx, GridDistributedTxRemoteAdapter<K, V> tx,
GridDhtTxPrepareRequest<K, V> req) { GridDhtTxPrepareRequest<K, V> req) throws IgniteTxHeuristicCheckedException {
assert tx != null : "No transaction for one-phase commit prepare request: " + req; assert tx != null : "No transaction for one-phase commit prepare request: " + req;


try { try {
Expand All @@ -850,6 +851,10 @@ protected void finish(


tx.commit(); tx.commit();
} }
catch (IgniteTxHeuristicCheckedException e) {
// Just rethrow this exception. Transaction was already uncommitted.
throw e;
}
catch (Throwable e) { catch (Throwable e) {
U.error(log, "Failed committing transaction [tx=" + tx + ']', e); U.error(log, "Failed committing transaction [tx=" + tx + ']', e);


Expand Down
Expand Up @@ -448,28 +448,6 @@ assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode
", tx=" + this + ']'; ", tx=" + this + ']';
} }


/**
* Uncommits transaction by invalidating all of its entries.
*/
@SuppressWarnings({"CatchGenericClass"})
private void uncommit() {
for (IgniteTxEntry<K, V> e : writeMap().values()) {
try {
GridCacheEntryEx<K, V> Entry = e.cached();

if (e.op() != NOOP)
Entry.invalidate(null, xidVer);
}
catch (Throwable t) {
U.error(log, "Failed to invalidate transaction entries while reverting a commit.", t);

break;
}
}

cctx.tm().uncommitTx(this);
}

/** /**
* Gets cache entry for given key. * Gets cache entry for given key.
* *
Expand Down Expand Up @@ -2239,7 +2217,7 @@ else if (drRmvMap != null) {
if (missedForLoad != null) { if (missedForLoad != null) {
IgniteInternalFuture<Boolean> fut = loadMissing( IgniteInternalFuture<Boolean> fut = loadMissing(
cacheCtx, cacheCtx,
/*read through*/true, /*read through*/cacheCtx.config().isLoadPreviousValue(),
/*async*/true, /*async*/true,
missedForLoad, missedForLoad,
deserializePortables(cacheCtx), deserializePortables(cacheCtx),
Expand Down
Expand Up @@ -90,7 +90,7 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override protected void beforeTest() throws Exception { @Override protected void beforeTest() throws Exception {
assert jcache().unwrap(Ignite.class).transactions().tx() == null; assert jcache().unwrap(Ignite.class).transactions().tx() == null;
assert jcache().localSize() == 0; assertEquals(0, jcache().localSize());
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
Expand Up @@ -566,8 +566,7 @@ public void testReload() throws Exception {


checkLastMethod("load"); checkLastMethod("load");


assert val != null; assertEquals("reloaded-" + i, val);
assert val.equals("reloaded-" + i);


store.resetLastMethod(); store.resetLastMethod();


Expand Down

0 comments on commit 109dbe7

Please sign in to comment.