Skip to content

Commit

Permalink
Merge branches 'ignite-96' and 'sprint-1' of https://git-wip-us.apach…
Browse files Browse the repository at this point in the history
…e.org/repos/asf/incubator-ignite into ignite-96

Conflicts:
	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
	modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
  • Loading branch information
Yakov Zhdanov committed Feb 10, 2015
1 parent c3ea6fd commit e9b822d
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 170 deletions.
Expand Up @@ -24,20 +24,15 @@
import org.apache.ignite.events.*; import org.apache.ignite.events.*;
import org.apache.ignite.internal.*; import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.transactions.*;


import javax.cache.expiry.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*; import java.util.concurrent.locks.*;


import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheDistributionMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
Expand Down Expand Up @@ -207,165 +202,167 @@ public void testReaderTtlNoTx() throws Exception {
* @throws Exception * @throws Exception
*/ */
private void checkReaderTtl(boolean inTx) throws Exception { private void checkReaderTtl(boolean inTx) throws Exception {
int ttl = 1000; assert false : "ignite-96";


final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl)); // int ttl = 1000;

//
final GridCache<String, Integer> c = cache(); // final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));

//
final String key = primaryKeysForCache(jcache(), 1).get(0); // final GridCache<String, Integer> c = cache();

//
c.put(key, 1); // final String key = primaryKeysForCache(jcache(), 1).get(0);

//
CacheEntry<String, Integer> entry = c.entry(key); // c.put(key, 1);

//
assert entry != null; // Cache.Entry<String, Integer> entry = c.entry(key);

//
assertEquals(0, entry.timeToLive()); // assert entry != null;
assertEquals(0, entry.expirationTime()); //

// assertEquals(0, entry.timeToLive());
long startTime = System.currentTimeMillis(); // assertEquals(0, entry.expirationTime());

//
int fullIdx = nearIdx == 0 ? 1 : 0; // long startTime = System.currentTimeMillis();

//
// Now commit transaction and check that ttl and expire time have been saved. // int fullIdx = nearIdx == 0 ? 1 : 0;
IgniteTx tx = inTx ? grid(fullIdx).transactions().txStart() : null; //

// // Now commit transaction and check that ttl and expire time have been saved.
try { // IgniteTx tx = inTx ? grid(fullIdx).transactions().txStart() : null;
jcache(fullIdx).withExpiryPolicy(expiry).put(key, 1); //

// try {
if (tx != null) // jcache(fullIdx).withExpiryPolicy(expiry).put(key, 1);
tx.commit(); //
} // if (tx != null)
finally { // tx.commit();
if (tx != null) // }
tx.close(); // finally {
} // if (tx != null)

// tx.close();
long[] expireTimes = new long[gridCount()]; // }

//
for (int i = 0; i < gridCount(); i++) { // long[] expireTimes = new long[gridCount()];
CacheEntry<String, Integer> curEntry = cache(i).entry(key); //

// for (int i = 0; i < gridCount(); i++) {
if (curEntry.primary() || curEntry.backup() || i == nearIdx) { // CacheEntry<String, Integer> curEntry = cache(i).entry(key);
assertEquals(ttl, curEntry.timeToLive()); //

// if (curEntry.primary() || curEntry.backup() || i == nearIdx) {
assert curEntry.expirationTime() > startTime; // assertEquals(ttl, curEntry.timeToLive());

//
expireTimes[i] = curEntry.expirationTime(); // assert curEntry.expirationTime() > startTime;
} //
} // expireTimes[i] = curEntry.expirationTime();

// }
// One more update from the same cache entry to ensure that expire time is shifted forward. // }
U.sleep(100); //

// // One more update from the same cache entry to ensure that expire time is shifted forward.
tx = inTx ? grid(fullIdx).transactions().txStart() : null; // U.sleep(100);

//
try { // tx = inTx ? grid(fullIdx).transactions().txStart() : null;
jcache(fullIdx).withExpiryPolicy(expiry).put(key, 2); //

// try {
if (tx != null) // jcache(fullIdx).withExpiryPolicy(expiry).put(key, 2);
tx.commit(); //
} // if (tx != null)
finally { // tx.commit();
if (tx != null) // }
tx.close(); // finally {
} // if (tx != null)

// tx.close();
for (int i = 0; i < gridCount(); i++) { // }
CacheEntry<String, Integer> curEntry = cache(i).entry(key); //

// for (int i = 0; i < gridCount(); i++) {
if (curEntry.primary() || curEntry.backup() || i == nearIdx) { // CacheEntry<String, Integer> curEntry = cache(i).entry(key);
assertEquals(ttl, curEntry.timeToLive()); //

// if (curEntry.primary() || curEntry.backup() || i == nearIdx) {
assert curEntry.expirationTime() > expireTimes[i]; // assertEquals(ttl, curEntry.timeToLive());

//
expireTimes[i] = curEntry.expirationTime(); // assert curEntry.expirationTime() > expireTimes[i];
} //
} // expireTimes[i] = curEntry.expirationTime();

// }
// And one more update to ensure that ttl is not changed and expire time is not shifted forward. // }
U.sleep(100); //

// // And one more update to ensure that ttl is not changed and expire time is not shifted forward.
tx = inTx ? grid(fullIdx).transactions().txStart() : null; // U.sleep(100);

//
try { // tx = inTx ? grid(fullIdx).transactions().txStart() : null;
jcache(fullIdx).put(key, 4); //
} // try {
finally { // jcache(fullIdx).put(key, 4);
if (tx != null) // }
tx.commit(); // finally {
} // if (tx != null)

// tx.commit();
for (int i = 0; i < gridCount(); i++) { // }
CacheEntry<String, Integer> curEntry = cache(i).entry(key); //

// for (int i = 0; i < gridCount(); i++) {
if (curEntry.primary() || curEntry.backup() || i == nearIdx) { // CacheEntry<String, Integer> curEntry = cache(i).entry(key);
assertEquals(ttl, curEntry.timeToLive()); //
assertEquals(expireTimes[i], curEntry.expirationTime()); // if (curEntry.primary() || curEntry.backup() || i == nearIdx) {
} // assertEquals(ttl, curEntry.timeToLive());
} // assertEquals(expireTimes[i], curEntry.expirationTime());

// }
// Avoid reloading from store. // }
map.remove(key); //

// // Avoid reloading from store.
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { // map.remove(key);
@SuppressWarnings("unchecked") //
@Override public boolean applyx() throws IgniteCheckedException { // assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
try { // @SuppressWarnings("unchecked")
Integer val = c.get(key); // @Override public boolean applyx() throws IgniteCheckedException {

// try {
if (val != null) { // Integer val = c.get(key);
info("Value is in cache [key=" + key + ", val=" + val + ']'); //

// if (val != null) {
return false; // info("Value is in cache [key=" + key + ", val=" + val + ']');
} //

// return false;
// Get "cache" field from GridCacheProxyImpl. // }
GridCacheAdapter c0 = GridTestUtils.getFieldValue(c, "cache"); //

// // Get "cache" field from GridCacheProxyImpl.
if (!c0.context().deferredDelete()) { // GridCacheAdapter c0 = GridTestUtils.getFieldValue(c, "cache");
GridCacheEntryEx e0 = c0.peekEx(key); //

// if (!c0.context().deferredDelete()) {
return e0 == null || (e0.rawGet() == null && e0.valueBytes() == null); // GridCacheEntryEx e0 = c0.peekEx(key);
} //
else // return e0 == null || (e0.rawGet() == null && e0.valueBytes() == null);
return true; // }
} // else
catch (GridCacheEntryRemovedException e) { // return true;
throw new RuntimeException(e); // }
} // catch (GridCacheEntryRemovedException e) {
} // throw new RuntimeException(e);
}, Math.min(ttl * 10, getTestTimeout()))); // }

// }
// Ensure that old TTL and expire time are not longer "visible". // }, Math.min(ttl * 10, getTestTimeout())));
entry = c.entry(key); //

// // Ensure that old TTL and expire time are not longer "visible".
assert entry.get() == null; // entry = c.entry(key);

//
assertEquals(0, entry.timeToLive()); // assert entry.get() == null;
assertEquals(0, entry.expirationTime()); //

// assertEquals(0, entry.timeToLive());
// Ensure that next update will not pick old expire time. // assertEquals(0, entry.expirationTime());

//
tx = inTx ? c.txStart() : null; // // Ensure that next update will not pick old expire time.

//
try { // tx = inTx ? c.txStart() : null;
entry.set(10); //
} // try {
finally { // entry.set(10);
if (tx != null) // }
tx.commit(); // finally {
} // if (tx != null)

// tx.commit();
U.sleep(2000); // }

//
entry = c.entry(key); // U.sleep(2000);

//
assertEquals((Integer)10, entry.get()); // entry = c.entry(key);

//
assertEquals(0, entry.timeToLive()); // assertEquals((Integer)10, entry.get());
assertEquals(0, entry.expirationTime()); //
// assertEquals(0, entry.timeToLive());
// assertEquals(0, entry.expirationTime());
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*; import org.apache.ignite.testframework.junits.common.*;


import javax.cache.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;


Expand Down Expand Up @@ -195,13 +196,11 @@ private void checkProjectionFilter(GridCache cache, int expCnt) throws Exception
* @param cache Cache. * @param cache Cache.
* @return Projection. * @return Projection.
*/ */
protected CacheProjection createProjectionForFilter(GridCache cache) { protected CacheProjection createProjectionForFilter(final GridCache cache) {
return cache.projection(new IgnitePredicate<CacheEntry<Key, Person>>() { return cache.projection(new IgnitePredicate<Cache.Entry<Key, Person>>() {
@Override public boolean apply(CacheEntry<Key, Person> e) { @Override public boolean apply(Cache.Entry<Key, Person> e) {
Key key = e.getKey(); Key key = e.getKey();
Person val = e.peek(); Person val = e.getValue();

assertNotNull(e.version());


assertEquals(key.id, (Integer)val.salary); assertEquals(key.id, (Integer)val.salary);


Expand Down

0 comments on commit e9b822d

Please sign in to comment.