Skip to content

Commit

Permalink
GG-9655 - Fixing tests after merge.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Goncharuk committed Jan 31, 2015
1 parent 4fde8d3 commit 9215a07
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 13 deletions.
Expand Up @@ -44,6 +44,9 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
/** Success flag. */
private volatile boolean success;

/** */
private volatile boolean invokeRes;

/**
* Empty constructor.
*/
Expand All @@ -69,6 +72,19 @@ public GridCacheReturn(V v, boolean success) {
this.success = success;
}

/**
*
* @param v Value.
* @param success Success flag.
*/
public GridCacheReturn(V v, boolean success, boolean invokeRes) {
assert !invokeRes || v instanceof Map;

this.v = v;
this.success = success;
this.invokeRes = invokeRes;
}

/**
* @return Value.
*/
Expand All @@ -85,6 +101,13 @@ public boolean hasValue() {
return v != null;
}

/**
* @return If return is invoke result.
*/
public boolean invokeResult() {
return invokeRes;
}

/**
* @param v Value.
* @return This instance for chaining.
Expand Down Expand Up @@ -134,6 +157,8 @@ public synchronized void addEntryProcessResult(Object key, EntryProcessorResult<
assert key != null;
assert res != null;

invokeRes = true;

HashMap<Object, EntryProcessorResult> resMap = (HashMap<Object, EntryProcessorResult>)v;

if (resMap == null) {
Expand All @@ -145,6 +170,26 @@ public synchronized void addEntryProcessResult(Object key, EntryProcessorResult<
resMap.put(key, res);
}

/**
* @param other Other result to merge with.
*/
public synchronized void mergeEntryProcessResults(GridCacheReturn<V> other) {
assert invokeRes || v == null : "Invalid state to merge: " + this;
assert other.invokeRes;

invokeRes = true;

HashMap<Object, EntryProcessorResult> resMap = (HashMap<Object, EntryProcessorResult>)v;

if (resMap == null) {
resMap = new HashMap<>();

v = (V)resMap;
}

resMap.putAll((Map<Object, EntryProcessorResult>)other.v);
}

/** {@inheritDoc} */
@Override public Object ggClassId() {
return GG_CLASS_ID;
Expand All @@ -154,13 +199,15 @@ public synchronized void addEntryProcessResult(Object key, EntryProcessorResult<
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeBoolean(success);
out.writeObject(v);
out.writeBoolean(invokeRes);
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
success = in.readBoolean();
v = (V)in.readObject();
invokeRes = in.readBoolean();
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -265,7 +265,7 @@ private boolean checkLocks() {
/**
*
*/
private void checkFilters() {
private void onEntriesLocked() {
ret = new GridCacheReturn<>(null, true);

for (IgniteTxEntry<K, V> txEntry : tx.optimisticLockEntries()) {
Expand Down Expand Up @@ -769,7 +769,7 @@ private void prepare0() {
// We are holding transaction-level locks for entries here, so we can get next write version.
tx.writeVersion(cctx.versions().next(tx.topologyVersion()));

checkFilters();
onEntriesLocked();

{
Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new HashMap<>();
Expand Down
Expand Up @@ -1805,7 +1805,7 @@ else if (F.contains(readers, node.id())) // Reader became primary or backup.
if (retVal == null) {
computedMap = U.newHashMap(keys.size());

retVal = new GridCacheReturn<>((Object)computedMap, updRes.success());
retVal = new GridCacheReturn<>((Object)computedMap, updRes.success(), true);
}

computedMap.put(k, updRes.computedResult());
Expand Down
Expand Up @@ -230,7 +230,7 @@ private boolean checkLocks() {
log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
}

return true;
return locked;
}

/**
Expand Down Expand Up @@ -1046,8 +1046,7 @@ void onResult(UUID nodeId, GridNearTxPrepareResponse<K, V> res) {
}
}

if (tx.implicitSingle())
tx.implicitSingleResult(res.returnValue());
tx.implicitSingleResult(res.returnValue());

for (IgniteTxKey<K> key : res.filterFailedKeys()) {
IgniteTxEntry<K, V> txEntry = tx.entry(key);
Expand Down
Expand Up @@ -282,7 +282,10 @@ protected IgniteTxLocalAdapter(
* @param ret Result.
*/
public void implicitSingleResult(GridCacheReturn<V> ret) {
implicitRes = ret;
if (ret.invokeResult())
implicitRes.mergeEntryProcessResults(ret);
else
implicitRes = ret;
}

/**
Expand Down Expand Up @@ -730,17 +733,32 @@ else if (cacheCtx.isNear() && txEntry.locallyMapped())

boolean evt = !isNearLocallyMapped(txEntry, false);

if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
txEntry.cached().unswap(true, false);

GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,
true);

// For near local transactions we must record DHT version
// in order to keep near entries on backup nodes until
// backup remote transaction completes.
if (cacheCtx.isNear())
if (cacheCtx.isNear()) {
((GridNearCacheEntry<K, V>)cached).recordDhtVersion(txEntry.dhtVersion());

if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
txEntry.cached().unswap(true, false);
if (txEntry.op() == CREATE || txEntry.op() == UPDATE && txEntry.drExpireTime() == -1L) {
ExpiryPolicy expiry = txEntry.expiry();

GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,
true);
if (expiry == null)
expiry = cacheCtx.expiry();

if (expiry != null) {
Duration duration = cached.hasValue() ?
expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();

txEntry.ttl(CU.toTtl(duration));
}
}
}

GridCacheOperation op = res.get1();
V val = res.get2();
Expand Down
Expand Up @@ -5392,7 +5392,7 @@ public void testIgniteCacheIterator() throws Exception {

assertFalse(cache.iterator().hasNext());

final int SIZE = 20000;
final int SIZE = 5000;

Map<String, Integer> entries = new HashMap<>();

Expand Down

0 comments on commit 9215a07

Please sign in to comment.