Skip to content

Commit

Permalink
GG-9655 - Fixing tests after merge.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Goncharuk committed Jan 31, 2015
1 parent 61c102c commit 9a995a3
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 109 deletions.
Expand Up @@ -451,6 +451,7 @@ private void addMapping0(
entry.valueBytes(e.valueBytes());
entry.ttl(e.ttl());
entry.filters(e.filters());
entry.expiry(e.expiry());
entry.drExpireTime(e.drExpireTime());
entry.drVersion(e.drVersion());
}
Expand Down
Expand Up @@ -116,6 +116,12 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
/** Keys that did not pass the filter. */
private Collection<IgniteTxKey<K>> filterFailedKeys;

/** Keys that should be locked. */
private GridConcurrentHashSet<IgniteTxKey<K>> lockKeys = new GridConcurrentHashSet<>();

/** Locks ready flag. */
private volatile boolean locksReady;

/**
* Empty constructor required for {@link Externalizable}.
*/
Expand Down Expand Up @@ -209,9 +215,9 @@ public IgniteUuid nearMiniId() {
if (log.isDebugEnabled())
log.debug("Transaction future received owner changed callback: " + entry);

boolean ret = tx.hasWriteKey(entry.txKey());
boolean rmv = lockKeys.remove(entry.txKey());

return ret && mapIfLocked();
return rmv && mapIfLocked();
}

/** {@inheritDoc} */
Expand All @@ -235,44 +241,7 @@ GridDhtTxLocalAdapter<K, V> tx() {
* @return {@code True} if all locks are owned.
*/
private boolean checkLocks() {
for (IgniteTxEntry<K, V> txEntry : tx.optimisticLockEntries()) {
while (true) {
GridCacheEntryEx<K, V> cached = txEntry.cached();

try {
if (txEntry.explicitVersion() == null) {
// Don't compare entry against itself.
if (!cached.lockedLocally(tx.xidVersion())) {
if (log.isDebugEnabled())
log.debug("Transaction entry is not locked by transaction (will wait) [entry=" +
cached + ", tx=" + tx + ']');

return false;
}
}
else {
if (!cached.lockedBy(txEntry.explicitVersion())) {
if (log.isDebugEnabled())
log.debug("Transaction entry is not locked by explicit version (will wait) [entry=" +
cached + ", tx=" + tx + ']');

return false;
}
}

break; // While.
}
// Possible if entry cached within transaction is obsolete.
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry);

txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes());
}
}
}

return true;
return locksReady && lockKeys.isEmpty();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -460,13 +429,26 @@ private void readyLocks() {
Collections.singletonList(tx.groupLockEntry()) : writes;

for (IgniteTxEntry<K, V> txEntry : checkEntries) {
if (txEntry.cached().isLocal())
GridCacheContext<K, V> cacheCtx = txEntry.context();

if (cacheCtx.isLocal())
continue;

while (true) {
GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached();
GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached();

if (entry == null) {
entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key());

txEntry.cached(entry, txEntry.keyBytes());
}

if (tx.optimistic() && txEntry.explicitVersion() == null)
lockKeys.add(txEntry.txKey());

while (true) {
try {
assert txEntry.explicitVersion() == null || entry.lockedBy(txEntry.explicitVersion());

GridCacheMvccCandidate<K> c = entry.readyLock(tx.xidVersion());

if (log.isDebugEnabled())
Expand All @@ -479,10 +461,14 @@ private void readyLocks() {
if (log.isDebugEnabled())
log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry);

txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes());
entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key());

txEntry.cached(entry, txEntry.keyBytes());
}
}
}

locksReady = true;
}

/**
Expand Down
Expand Up @@ -301,11 +301,6 @@ public GridCacheVersion dhtVersion(int idx) {
return dhtVers[idx];
}

/** {@inheritDoc} */
@Override protected boolean transferExpiryPolicy() {
return true;
}

/**
* @return TTL for read operation.
*/
Expand Down
Expand Up @@ -82,7 +82,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
private boolean colocatedLocallyMapped;

/** Info for entries accessed locally in optimistic transaction. */
private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap;
private Map<IgniteTxKey<K>, IgniteCacheExpiryPolicy> accessMap;

/**
* Empty constructor required for {@link Externalizable}.
Expand Down Expand Up @@ -560,8 +560,7 @@ void readyNearLocks(GridDistributedTxMapping<K, V> mapping,
while (true) {
GridCacheContext<K, V> cacheCtx = txEntry.cached().context();

if (!cacheCtx.isNear())
break;
assert cacheCtx.isNear();

GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached();

Expand Down Expand Up @@ -1156,7 +1155,7 @@ public IgniteInternalFuture<GridCacheReturn<V>> lockAllAsync(GridCacheContext<K,

/** {@inheritDoc} */
@Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx,
IgniteTxKey key,
IgniteTxKey<K> key,
@Nullable ExpiryPolicy expiryPlc)
{
assert optimistic();
Expand Down Expand Up @@ -1187,7 +1186,7 @@ public IgniteInternalFuture<GridCacheReturn<V>> lockAllAsync(GridCacheContext<K,
*/
private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys) {
if (accessMap != null) {
for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
for (Map.Entry<IgniteTxKey<K>, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
if (e.getKey().cacheId() == cacheCtx.cacheId() && keys.contains(e.getKey().key()))
return e.getValue();
}
Expand All @@ -1203,7 +1202,7 @@ private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext<K, V> cacheCtx, Co
if (accessMap != null) {
assert optimistic();

for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
for (Map.Entry<IgniteTxKey<K>, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
if (e.getValue().entries() != null) {
GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId());

Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.near;

import org.apache.ignite.*;
import org.apache.ignite.client.util.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*;
Expand Down Expand Up @@ -78,6 +79,9 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
/** Full information about transaction nodes mapping. */
private GridDhtTxMapping<K, V> txMapping;

/** */
private Collection<IgniteTxKey<K>> lockKeys = new GridConcurrentHashSet<>();

/**
* Empty constructor required for {@link Externalizable}.
*/
Expand Down Expand Up @@ -128,6 +132,8 @@ public GridNearTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridNear
log.debug("Transaction future received owner changed callback: " + entry);

if (entry.context().isNear() && owner != null && tx.hasWriteKey(entry.txKey())) {
lockKeys.remove(entry.txKey());

// This will check for locks.
onDone();

Expand Down Expand Up @@ -213,45 +219,16 @@ void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping<
* @return {@code True} if all locks are owned.
*/
private boolean checkLocks() {
Collection<IgniteTxEntry<K, V>> checkEntries = tx.groupLock() ?
Collections.singletonList(tx.groupLockEntry()) :
tx.writeEntries();

for (IgniteTxEntry<K, V> txEntry : checkEntries) {
// Wait for near locks only.
if (!txEntry.context().isNear())
continue;

while (true) {
GridCacheEntryEx<K, V> cached = txEntry.cached();

try {
GridCacheVersion ver = txEntry.explicitVersion() != null ?
txEntry.explicitVersion() : tx.xidVersion();
boolean locked = lockKeys.isEmpty();

// If locks haven't been acquired yet, keep waiting.
if (!cached.lockedBy(ver)) {
if (log.isDebugEnabled())
log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + cached +
", tx=" + tx + ']');

return false;
}

break; // While.
}
// Possible if entry cached within transaction is obsolete.
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry);

txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes());
}
}
if (locked) {
if (log.isDebugEnabled())
log.debug("All locks are acquired for near prepare future: " + this);
}
else {
if (log.isDebugEnabled())
log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
}

if (log.isDebugEnabled())
log.debug("All locks are acquired for near prepare future: " + this);

return true;
}
Expand Down Expand Up @@ -564,7 +541,7 @@ else if (write.context().isColocated())
*
*/
private void preparePessimistic() {
Map<ClusterNode, GridDistributedTxMapping<K, V>> mappings = new HashMap<>();
Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping<K, V>> mappings = new HashMap<>();

long topVer = tx.topologyVersion();

Expand All @@ -577,12 +554,18 @@ private void preparePessimistic() {

ClusterNode primary = F.first(nodes);

GridDistributedTxMapping<K, V> nodeMapping = mappings.get(primary);
boolean near = cacheCtx.isNear();

IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near);

GridDistributedTxMapping<K, V> nodeMapping = mappings.get(key);

if (nodeMapping == null) {
nodeMapping = new GridDistributedTxMapping<>(primary);

mappings.put(primary, nodeMapping);
nodeMapping.near(cacheCtx.isNear());

mappings.put(key, nodeMapping);
}

txEntry.nodeId(primary.id());
Expand Down Expand Up @@ -663,13 +646,6 @@ private void preparePessimistic() {
tx.addDhtVersion(m.node().id(), dhtTx.xidVersion());

m.dhtVersion(dhtTx.xidVersion());

GridCacheVersion min = dhtTx.minVersion();

IgniteTxManager<K, V> tm = cctx.tm();

tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(),
tm.committedVersions(min), tm.rolledbackVersions(min));
}

tx.implicitSingleResult(dhtTx.implicitSingleResult());
Expand Down Expand Up @@ -821,8 +797,9 @@ private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMappin

IgniteTxManager<K, V> tm = cctx.tm();

tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(),
tm.committedVersions(min), tm.rolledbackVersions(min));
if (m.near())
tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(),
tm.committedVersions(min), tm.rolledbackVersions(min));
}

// Continue prepare before completing the future.
Expand Down Expand Up @@ -902,6 +879,9 @@ else if (!cacheCtx.isLocal())
entry.nodeId(primary.id());

if (cacheCtx.isNear()) {
if (entry.explicitVersion() == null)
lockKeys.add(entry.txKey());

while (true) {
try {
GridNearCacheEntry<K, V> cached = (GridNearCacheEntry<K, V>)entry.cached();
Expand Down Expand Up @@ -1083,7 +1063,8 @@ void onResult(UUID nodeId, GridNearTxPrepareResponse<K, V> res) {

m.dhtVersion(res.dhtVersion());

tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
if (m.near())
tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
}

// Proceed prepare before finishing mini future.
Expand Down
Expand Up @@ -1360,9 +1360,11 @@ private Collection<K> enlistRead(
* @param expiryPlc Expiry policy.
* @return Expiry policy wrapper for entries accessed locally in optimistic transaction.
*/
protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx,
IgniteTxKey key,
@Nullable ExpiryPolicy expiryPlc) {
protected IgniteCacheExpiryPolicy accessPolicy(
GridCacheContext ctx,
IgniteTxKey<K> key,
@Nullable ExpiryPolicy expiryPlc
) {
return null;
}

Expand Down
Expand Up @@ -4477,10 +4477,13 @@ private void checkTtl(boolean inTx, boolean oldEntry) throws Exception {

try {
grid(0).jcache(null).withExpiryPolicy(expiry).put(key, 1);

if (tx != null)
tx.commit();
}
finally {
if (tx != null)
tx.commit();
tx.close();
}

long[] expireTimes = new long[gridCount()];
Expand Down

0 comments on commit 9a995a3

Please sign in to comment.