Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
avinogradov committed Apr 21, 2015
1 parent 5619659 commit 842bf40
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 12 deletions.
Expand Up @@ -471,9 +471,8 @@ private void leaveBusy() {
/** /**
* @return Exchange futures. * @return Exchange futures.
*/ */
@SuppressWarnings( {"unchecked", "RedundantCast"}) public List<GridDhtPartitionsExchangeFuture> exchangeFutures() {
public List<IgniteInternalFuture<?>> exchangeFutures() { return exchFuts.values();
return (List<IgniteInternalFuture<?>>)(List)exchFuts.values();
} }


/** /**
Expand Down
Expand Up @@ -528,6 +528,17 @@ void map() {
topVer = tx.topologyVersionSnapshot(); topVer = tx.topologyVersionSnapshot();


if (topVer != null) { if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
if (fut.topologyVersion().equals(topVer)){
if (!fut.isCacheTopologyValid(cctx)) {
onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
cctx.name()));

return;
}
}
}

// Continue mapping on the same topology version as it was before. // Continue mapping on the same topology version as it was before.
this.topVer.compareAndSet(null, topVer); this.topVer.compareAndSet(null, topVer);


Expand Down
Expand Up @@ -660,6 +660,17 @@ void map() {
topVer = tx.topologyVersionSnapshot(); topVer = tx.topologyVersionSnapshot();


if (topVer != null) { if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
if (fut.topologyVersion().equals(topVer)){
if (!fut.isCacheTopologyValid(cctx)) {
onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
cctx.name()));

return;
}
}
}

// Continue mapping on the same topology version as it was before. // Continue mapping on the same topology version as it was before.
this.topVer.compareAndSet(null, topVer); this.topVer.compareAndSet(null, topVer);


Expand Down
Expand Up @@ -116,6 +116,7 @@ protected void putValid(String cacheName) {


/** /**
* Commits with error. * Commits with error.
*
* @param tx transaction. * @param tx transaction.
*/ */
protected void commitFailed(Transaction tx) { protected void commitFailed(Transaction tx) {
Expand All @@ -129,18 +130,20 @@ protected void commitFailed(Transaction tx) {
} }


/** /**
* Removes key-value * Removes key-value.
* *
* @param cacheName cache name. * @param cacheName cache name.
*/ */
public void remove(String cacheName) { public void remove(String cacheName) {
assert grid(0).cache(cacheName).get(KEY_VALUE) != null;

grid(0).cache(cacheName).remove(KEY_VALUE); grid(0).cache(cacheName).remove(KEY_VALUE);
} }


/** /**
* Asserts that cache doesn't contains key. * Asserts that cache doesn't contains key.
* *
* @param cacheName * @param cacheName cache name.
*/ */
public void assertEmpty(String cacheName) { public void assertEmpty(String cacheName) {
assert grid(0).cache(cacheName).get(KEY_VALUE) == null; assert grid(0).cache(cacheName).get(KEY_VALUE) == null;
Expand Down
Expand Up @@ -59,15 +59,15 @@ public abstract class IgniteTopologyValidatorAbstractTxCacheTest extends IgniteT


assertEmpty(null); // rolled back assertEmpty(null); // rolled back
assertEmpty(CACHE_NAME_1); // rolled back assertEmpty(CACHE_NAME_1); // rolled back
assertEmpty(CACHE_NAME_2); // rolled back


// try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
// putValid(null); putValid(null);
// putInvalid(CACHE_NAME_1); putInvalid(CACHE_NAME_1);
// } }
//
// assertEmpty(null); // rolled back
// assertEmpty(CACHE_NAME_1); // rolled back


assertEmpty(null); // rolled back
assertEmpty(CACHE_NAME_1); // rolled back


startGrid(1); startGrid(1);


Expand All @@ -87,6 +87,14 @@ public abstract class IgniteTopologyValidatorAbstractTxCacheTest extends IgniteT


startGrid(2); startGrid(2);


try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
putValid(null);
putInvalid(CACHE_NAME_1);
}

assertEmpty(null); // rolled back
assertEmpty(CACHE_NAME_1); // rolled back

try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ)) { try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
putValid(CACHE_NAME_1); putValid(CACHE_NAME_1);
commitFailed(tx); commitFailed(tx);
Expand All @@ -96,5 +104,22 @@ public abstract class IgniteTopologyValidatorAbstractTxCacheTest extends IgniteT
putInvalid(CACHE_NAME_1); putInvalid(CACHE_NAME_1);
} }


try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
putValid(null);
putValid(CACHE_NAME_2);
tx.commit();
}

remove(null);
remove(CACHE_NAME_2);

try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
putValid(null);
putValid(CACHE_NAME_2);
tx.commit();
}

remove(null);
remove(CACHE_NAME_2);
} }
} }

0 comments on commit 842bf40

Please sign in to comment.