Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Jan 20, 2015
1 parent 05ef302 commit f4b3995
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 16 deletions.
Expand Up @@ -27,12 +27,17 @@
*/
public interface CacheStoreSession {
/**
* @return Current cache transaction.
* @return Transaction belonging to current session.
*/
@Nullable public IgniteTx transaction();

/**
* @return Session properties.
* @return Current session properties.
*/
public <K, V> Map<K, V> properties();

/**
* @return Cache name.
*/
@Nullable public String cacheName();
}
Expand Up @@ -89,7 +89,7 @@ public GridCacheStoreManager(GridKernalContext ctx,

sesField.setAccessible(true);

sesField.set(cfgStore, new ThreadLocalSession());
sesField.set(cfgStore, new ThreadLocalSession(sesHolder));

sesEnabled = true;
}
Expand Down Expand Up @@ -499,7 +499,7 @@ public boolean putAllToStore(@Nullable IgniteTx tx, Map<K, IgniteBiTuple<V, Grid
log.debug("Storing values in cache store [map=" + map0 + ']');

// TODO IGNITE-42.
Collection<Cache.Entry<? extends K, ? extends Object>> entries = new ArrayList<>(map.size());
Collection<Cache.Entry<? extends K, ?>> entries = new ArrayList<>(map.size());

for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : map.entrySet())
entries.add(new CacheEntryImpl<>(e.getKey(), locStore ? e.getValue() : e.getValue().get1()));
Expand Down Expand Up @@ -701,13 +701,13 @@ private boolean initSession(@Nullable IgniteTx tx) {
ses = ((GridMetadataAware)tx).meta(SES_ATTR);

if (ses == null) {
ses = new SessionData(tx);
ses = new SessionData(tx, cctx.name());

((GridMetadataAware)tx).addMeta(SES_ATTR, ses);
}
}
else
ses = new SessionData(null);
ses = new SessionData(null, cctx.name());

sesHolder.set(ses);

Expand All @@ -721,14 +721,19 @@ private static class SessionData {
/** */
private final IgniteTx tx;

/** */
private final String cacheName;

/** */
private Map<Object, Object> props;

/**
* @param tx Current transaction.
* @param cacheName Cache name.
*/
private SessionData(@Nullable IgniteTx tx) {
private SessionData(@Nullable IgniteTx tx, @Nullable String cacheName) {
this.tx = tx;
this.cacheName = cacheName;
}

/**
Expand All @@ -747,12 +752,29 @@ private Map<Object, Object> properties() {

return props;
}

/**
* @return Cache name.
*/
private String cacheName() {
return cacheName;
}
}

/**
*
*/
private class ThreadLocalSession implements CacheStoreSession {
private static class ThreadLocalSession implements CacheStoreSession {
/** */
private final ThreadLocal<SessionData> sesHolder;

/**
* @param sesHolder Session holder.
*/
private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) {
this.sesHolder = sesHolder;
}

/** {@inheritDoc} */
@Nullable @Override public IgniteTx transaction() {
SessionData ses0 = sesHolder.get();
Expand All @@ -767,5 +789,12 @@ private class ThreadLocalSession implements CacheStoreSession {

return ses0 != null ? (Map<K1, V1>)ses0.properties() : null;
}

/** {@inheritDoc} */
@Nullable @Override public String cacheName() {
SessionData ses0 = sesHolder.get();

return ses0 != null ? ses0.cacheName() : null;
}
}
}
Expand Up @@ -85,7 +85,11 @@ protected int gridCount() {
cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
cacheCfg.setAtomicityMode(TRANSACTIONAL);

cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
cacheCfg.setCacheStoreFactory(new Factory<CacheStore<? super Object, ? super Object>>() {
@Override public CacheStore<? super Object, ? super Object> create() {
return store;
}
});
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);

Expand Down Expand Up @@ -141,6 +145,7 @@ public void testGroupLockPutOneKeyPessimistic() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkGroupLockPutOneKey(IgniteTxConcurrency concurrency) throws Exception {
Expand Down Expand Up @@ -208,6 +213,7 @@ public void testGroupLockRemoveOneKeyPessimistic() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkGroupLockRemoveOneKey(IgniteTxConcurrency concurrency) throws Exception {
Expand Down Expand Up @@ -288,6 +294,7 @@ public void testGroupLockGetOneKeyPessimistic() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkGroupLockGetOneKey(IgniteTxConcurrency concurrency) throws Exception {
Expand Down Expand Up @@ -342,17 +349,24 @@ private void checkGroupLockGetOneKey(IgniteTxConcurrency concurrency) throws Exc
assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey));
}

/** @throws IgniteCheckedException */
/**
* @throws Exception If failed.
*/
public void testGroupLockWithExternalLockOptimistic() throws Exception {
checkGroupLockWithExternalLock(OPTIMISTIC);
}

/** @throws IgniteCheckedException */
/**
* @throws Exception If failed.
*/
public void testGroupLockWithExternalLockPessimistic() throws Exception {
checkGroupLockWithExternalLock(PESSIMISTIC);
}

/** @throws IgniteCheckedException */
/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkGroupLockWithExternalLock(final IgniteTxConcurrency concurrency) throws Exception {
assert sanityCheckEnabled();

Expand Down Expand Up @@ -437,6 +451,7 @@ public void testSanityCheckDisabledPessimistic() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkSanityCheckDisabled(final IgniteTxConcurrency concurrency) throws Exception {
Expand Down Expand Up @@ -500,6 +515,7 @@ public void testGroupPartitionLockPessimistic() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkGroupPartitionLock(IgniteTxConcurrency concurrency) throws Exception {
Expand Down Expand Up @@ -624,6 +640,8 @@ public void testGetPutEmptyCacheOptimisticRepeatableRead() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @param isolation Transaction isolation mode.
* @throws Exception If failed.
*/
private void checkGetPut(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception {
Expand Down Expand Up @@ -683,6 +701,8 @@ private void checkGetPut(IgniteTxConcurrency concurrency, IgniteTxIsolation isol
}

/**
* @param concurrency Transaction concurrency mode.
* @param isolation Transaction isolation mode.
* @throws Exception If failed.
*/
private void checkGetPutEmptyCache(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception {
Expand Down Expand Up @@ -764,6 +784,8 @@ public void testGetRemovePessimisticRepeatableRead() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @param isolation Transaction isolation mode.
* @throws Exception If failed.
*/
private void checkGetRemove(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception {
Expand Down Expand Up @@ -844,6 +866,7 @@ public void testGetAfterPut() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkGetAfterPut(IgniteTxConcurrency concurrency) throws Exception {
Expand Down Expand Up @@ -925,6 +948,7 @@ public void testGetRepeatableReadPessimistic() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkGetRepeatableRead(IgniteTxConcurrency concurrency) throws Exception {
Expand Down Expand Up @@ -953,6 +977,7 @@ public void testGroupLockPutWrongKeyPessimistic() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkGroupLockPutWrongKey(IgniteTxConcurrency concurrency) throws Exception {
Expand Down Expand Up @@ -988,6 +1013,7 @@ public void testGroupLockRemoveWrongKeyPessimistic() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkGroupLockRemoveWrongKey(IgniteTxConcurrency concurrency) throws Exception {
Expand Down Expand Up @@ -1094,6 +1120,7 @@ public void testGroupLockWriteThroughBatchUpdatePessimistic() throws Exception {
}

/**
* @param concurrency Transaction concurrency mode.
* @throws Exception If failed.
*/
private void checkGroupLockWriteThrough(IgniteTxConcurrency concurrency) throws Exception {
Expand Down Expand Up @@ -1268,15 +1295,15 @@ private static class TestStore extends CacheStoreAdapter<Object, Object> {
}

/** {@inheritDoc} */
@Override public void writeAll(Collection<Cache.Entry<? extends Object, ? extends Object>> entries) {
for (Cache.Entry<? extends Object, ? extends Object> e : entries)
@Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) {
for (Cache.Entry<?, ?> e : entries)
storeMap.put(e.getKey(), e.getValue());

putCnt.incrementAndGet();
}

/** {@inheritDoc} */
@Override public void write(Cache.Entry<? extends Object, ? extends Object> e) {
@Override public void write(Cache.Entry<?, ?> e) {
storeMap.put(e.getKey(), e.getValue());

putCnt.incrementAndGet();
Expand Down
Expand Up @@ -57,4 +57,9 @@ public void newSession(@Nullable IgniteTx tx) {

return (Map<K, V>)props;
}

/** {@inheritDoc} */
@Nullable @Override public String cacheName() {
return null;
}
}
Expand Up @@ -55,4 +55,11 @@ public void newSession(@Nullable IgniteTx tx) {

return ses != null ? (Map<K, V>)ses.properties() : null;
}

/** {@inheritDoc} */
@Nullable @Override public String cacheName() {
TestCacheSession ses = sesHolder.get();

return ses != null ? ses.cacheName() : null;
}
}
Expand Up @@ -31,7 +31,7 @@ public class GridCacheGroupLockSelfTestSuite extends TestSuite {
* @throws Exception If failed.
*/
public static TestSuite suite() throws Exception {
TestSuite suite = new TestSuite("Gridgain Cache Group Lock Test Suite");
TestSuite suite = new TestSuite("Ignite Cache Group Lock Test Suite");

// One node.
suite.addTest(new TestSuite(GridCacheGroupLockNearSelfTest.class));
Expand Down

0 comments on commit f4b3995

Please sign in to comment.