Skip to content

Commit

Permalink
Pass strongly typed cache objects to the cache store manager
Browse files Browse the repository at this point in the history
  • Loading branch information
agoncharuk committed Feb 3, 2016
1 parent ddfe632 commit 0d7025c
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 56 deletions.
Expand Up @@ -1220,7 +1220,7 @@ else if (interceptorVal != val0)
// Persist outside of synchronization. The correctness of the // Persist outside of synchronization. The correctness of the
// value will be handled by current transaction. // value will be handled by current transaction.
if (writeThrough) if (writeThrough)
cctx.store().put(tx, keyValue(false), CU.value(val, cctx, false), newVer); cctx.store().put(tx, key, val, newVer);


if (intercept) if (intercept)
cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary)); cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary));
Expand Down Expand Up @@ -1654,7 +1654,7 @@ else if (ttl != CU.TTL_ZERO)


if (writeThrough) if (writeThrough)
// Must persist inside synchronization in non-tx mode. // Must persist inside synchronization in non-tx mode.
cctx.store().put(null, keyValue(false), CU.value(updated, cctx, false), ver); cctx.store().put(null, key, updated, ver);


// Update index inside synchronization since it can be updated // Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock. // in load methods without actually holding entry lock.
Expand Down Expand Up @@ -1688,7 +1688,7 @@ else if (ttl != CU.TTL_ZERO)
else { else {
if (writeThrough) if (writeThrough)
// Must persist inside synchronization in non-tx mode. // Must persist inside synchronization in non-tx mode.
cctx.store().remove(null, keyValue(false)); cctx.store().remove(null, key);


boolean hasValPtr = hasOffHeapPointer(); boolean hasValPtr = hasOffHeapPointer();


Expand Down Expand Up @@ -1905,10 +1905,10 @@ else if (ttl != CU.TTL_ZERO)
if (val == null) { if (val == null) {
assert deletedUnlocked(); assert deletedUnlocked();


cctx.store().remove(null, keyValue(false)); cctx.store().remove(null, key);
} }
else else
cctx.store().put(null, keyValue(false), CU.value(val, cctx, false), ver); cctx.store().put(null, key, val, ver);
} }


return new GridCacheUpdateAtomicResult(false, return new GridCacheUpdateAtomicResult(false,
Expand Down Expand Up @@ -1958,10 +1958,10 @@ else if (ttl != CU.TTL_ZERO)
if (val == null) { if (val == null) {
assert deletedUnlocked(); assert deletedUnlocked();


cctx.store().remove(null, keyValue(false)); cctx.store().remove(null, key);
} }
else else
cctx.store().put(null, keyValue(false), CU.value(val, cctx, false), ver); cctx.store().put(null, key, val, ver);
} }
else { else {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
Expand Down Expand Up @@ -3244,7 +3244,7 @@ else if (deletedUnlocked())


if (cctx.store().isLocal()) { if (cctx.store().isLocal()) {
if (val != null) if (val != null)
cctx.store().put(null, keyValue(false), CU.value(val, cctx, false), ver); cctx.store().put(null, key, val, ver);
} }


return true; return true;
Expand Down
Expand Up @@ -590,7 +590,7 @@ public boolean clearInternal(
} }


if (cctx.store().isLocal()) if (cctx.store().isLocal())
cctx.store().remove(null, keyValue(false)); cctx.store().remove(null, key);


rmv = true; rmv = true;


Expand Down
Expand Up @@ -572,7 +572,7 @@ private void clearSwap() {
cctx.swap().remove(key); cctx.swap().remove(key);


if (isLocStore) if (isLocStore)
cctx.store().remove(null, key.value(cctx.cacheObjectContext(), false)); cctx.store().remove(null, key);
} }
} }
} }
Expand Down
Expand Up @@ -2241,11 +2241,14 @@ else if (F.contains(readers, node.id())) // Reader became primary or backup.
putMap; putMap;


try { try {
ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() { Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(storeMap,
@Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) { new C1<CacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>>() {
return F.t(v, ver); @Override public IgniteBiTuple<? extends CacheObject, GridCacheVersion> apply(CacheObject val) {
} return F.t(val, ver);
})); }
});

ctx.store().putAll(null, view);
} }
catch (CacheStorePartialUpdateException e) { catch (CacheStorePartialUpdateException e) {
storeErr = e; storeErr = e;
Expand Down
Expand Up @@ -73,6 +73,7 @@
import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionIsolation;
Expand Down Expand Up @@ -1128,9 +1129,9 @@ private Map<K, EntryProcessorResult> updateWithBatch(
try { try {
int size = locked.size(); int size = locked.size();


Map<Object, Object> putMap = null; Map<KeyCacheObject, CacheObject> putMap = null;


Collection<Object> rmvKeys = null; Collection<KeyCacheObject> rmvKeys = null;


List<CacheObject> writeVals = null; List<CacheObject> writeVals = null;


Expand Down Expand Up @@ -1252,7 +1253,7 @@ private Map<K, EntryProcessorResult> updateWithBatch(
if (rmvKeys == null) if (rmvKeys == null)
rmvKeys = new ArrayList<>(size); rmvKeys = new ArrayList<>(size);


rmvKeys.add(entry.key().value(ctx.cacheObjectContext(), false)); rmvKeys.add(entry.key());
} }
else { else {
if (intercept) { if (intercept) {
Expand Down Expand Up @@ -1290,7 +1291,7 @@ private Map<K, EntryProcessorResult> updateWithBatch(
writeVals = new ArrayList<>(size); writeVals = new ArrayList<>(size);
} }


putMap.put(CU.value(entry.key(), ctx, false), CU.value(updated, ctx, false)); putMap.put(entry.key(), updated);
writeVals.add(updated); writeVals.add(updated);
} }
} }
Expand Down Expand Up @@ -1326,7 +1327,7 @@ else if (op == UPDATE) {
writeVals = new ArrayList<>(size); writeVals = new ArrayList<>(size);
} }


putMap.put(CU.value(entry.key(), ctx, false), CU.value(cacheVal, ctx, false)); putMap.put(entry.key(), cacheVal);
writeVals.add(cacheVal); writeVals.add(cacheVal);
} }
else { else {
Expand Down Expand Up @@ -1357,7 +1358,7 @@ else if (op == UPDATE) {
if (rmvKeys == null) if (rmvKeys == null)
rmvKeys = new ArrayList<>(size); rmvKeys = new ArrayList<>(size);


rmvKeys.add(entry.key().value(ctx.cacheObjectContext(), false)); rmvKeys.add(entry.key());
} }


filtered.add(entry); filtered.add(entry);
Expand Down Expand Up @@ -1417,8 +1418,8 @@ else if (op == UPDATE) {
List<GridCacheEntryEx> entries, List<GridCacheEntryEx> entries,
final GridCacheVersion ver, final GridCacheVersion ver,
@Nullable List<CacheObject> writeVals, @Nullable List<CacheObject> writeVals,
@Nullable Map<Object, Object> putMap, @Nullable Map<KeyCacheObject, CacheObject> putMap,
@Nullable Collection<Object> rmvKeys, @Nullable Collection<KeyCacheObject> rmvKeys,
@Nullable ExpiryPolicy expiryPlc, @Nullable ExpiryPolicy expiryPlc,
boolean keepBinary, boolean keepBinary,
@Nullable CachePartialUpdateCheckedException err, @Nullable CachePartialUpdateCheckedException err,
Expand All @@ -1433,11 +1434,14 @@ else if (op == UPDATE) {
try { try {
if (putMap != null) { if (putMap != null) {
try { try {
ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() { Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(putMap,
@Override public IgniteBiTuple<Object, GridCacheVersion> apply(Object v) { new C1<CacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>>() {
return F.t(v, ver); @Override public IgniteBiTuple<? extends CacheObject, GridCacheVersion> apply(CacheObject val) {
} return F.t(val, ver);
})); }
});

ctx.store().putAll(null, view);
} }
catch (CacheStorePartialUpdateException e) { catch (CacheStorePartialUpdateException e) {
storeErr = e; storeErr = e;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.Map; import java.util.Map;
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheManager; import org.apache.ignite.internal.processors.cache.GridCacheManager;
import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
Expand All @@ -33,7 +34,7 @@
/** /**
* Cache store manager interface. * Cache store manager interface.
*/ */
public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> { public interface CacheStoreManager extends GridCacheManager {
/** /**
* Initialize store manager. * Initialize store manager.
* *
Expand Down Expand Up @@ -131,7 +132,7 @@ public boolean loadCache(final GridInClosure3<KeyCacheObject, Object, GridCacheV
* @return {@code true} If there is a persistent storage. * @return {@code true} If there is a persistent storage.
* @throws IgniteCheckedException If storage failed. * @throws IgniteCheckedException If storage failed.
*/ */
public boolean put(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver) public boolean put(@Nullable IgniteInternalTx tx, KeyCacheObject key, CacheObject val, GridCacheVersion ver)
throws IgniteCheckedException; throws IgniteCheckedException;


/** /**
Expand All @@ -142,24 +143,26 @@ public boolean put(@Nullable IgniteInternalTx tx, Object key, Object val, GridCa
* @return {@code True} if there is a persistent storage. * @return {@code True} if there is a persistent storage.
* @throws IgniteCheckedException If storage failed. * @throws IgniteCheckedException If storage failed.
*/ */
public boolean putAll(@Nullable IgniteInternalTx tx, Map<Object, IgniteBiTuple<Object, GridCacheVersion>> map) public boolean putAll(
throws IgniteCheckedException; @Nullable IgniteInternalTx tx,
Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> map
) throws IgniteCheckedException;


/** /**
* @param tx Cache transaction. * @param tx Cache transaction.
* @param key Key. * @param key Key.
* @return {@code True} if there is a persistent storage. * @return {@code True} if there is a persistent storage.
* @throws IgniteCheckedException If storage failed. * @throws IgniteCheckedException If storage failed.
*/ */
public boolean remove(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException; public boolean remove(@Nullable IgniteInternalTx tx, KeyCacheObject key) throws IgniteCheckedException;


/** /**
* @param tx Cache transaction. * @param tx Cache transaction.
* @param keys Key. * @param keys Key.
* @return {@code True} if there is a persistent storage. * @return {@code True} if there is a persistent storage.
* @throws IgniteCheckedException If storage failed. * @throws IgniteCheckedException If storage failed.
*/ */
public boolean removeAll(@Nullable IgniteInternalTx tx, Collection<Object> keys) public boolean removeAll(@Nullable IgniteInternalTx tx, Collection<? extends KeyCacheObject> keys)
throws IgniteCheckedException; throws IgniteCheckedException;


/** /**
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper; import org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper;
import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException; import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException;
import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.processors.cache.GridCacheInternal;
Expand All @@ -60,7 +61,6 @@
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -526,25 +526,25 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx,
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean put(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver) @Override public boolean put(@Nullable IgniteInternalTx tx, KeyCacheObject key, CacheObject val, GridCacheVersion ver)
throws IgniteCheckedException { throws IgniteCheckedException {
if (store != null) { if (store != null) {
// Never persist internal keys. // Never persist internal keys.
if (key instanceof GridCacheInternal) if (key instanceof GridCacheInternal)
return true; return true;


key = cctx.unwrapBinaryIfNeeded(key, !convertBinary()); Object key0 = cctx.unwrapBinaryIfNeeded(key, !convertBinary());
val = cctx.unwrapBinaryIfNeeded(val, !convertBinary()); Object val0 = cctx.unwrapBinaryIfNeeded(val, !convertBinary());


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']'); log.debug("Storing value in cache store [key=" + key0 + ", val=" + val0 + ']');


sessionInit0(tx); sessionInit0(tx);


boolean threwEx = true; boolean threwEx = true;


try { try {
store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); store.write(new CacheEntryImpl<>(key0, locStore ? F.t(val0, ver) : val0));


threwEx = false; threwEx = false;
} }
Expand All @@ -562,7 +562,7 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx,
} }


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Stored value in cache store [key=" + key + ", val=" + val + ']'); log.debug("Stored value in cache store [key=" + key0 + ", val=" + val0 + ']');


return true; return true;
} }
Expand All @@ -571,13 +571,16 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx,
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean putAll(@Nullable IgniteInternalTx tx, Map map) throws IgniteCheckedException { @Override public boolean putAll(
@Nullable IgniteInternalTx tx,
Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> map
) throws IgniteCheckedException {
if (F.isEmpty(map)) if (F.isEmpty(map))
return true; return true;


if (map.size() == 1) { if (map.size() == 1) {
Map.Entry<Object, IgniteBiTuple<Object, GridCacheVersion>> e = Map.Entry<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> e =
((Map<Object, IgniteBiTuple<Object, GridCacheVersion>>)map).entrySet().iterator().next(); ((Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>>)map).entrySet().iterator().next();


return put(tx, e.getKey(), e.getValue().get1(), e.getValue().get2()); return put(tx, e.getKey(), e.getValue().get1(), e.getValue().get2());
} }
Expand Down Expand Up @@ -630,23 +633,23 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx,
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean remove(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException { @Override public boolean remove(@Nullable IgniteInternalTx tx, KeyCacheObject key) throws IgniteCheckedException {
if (store != null) { if (store != null) {
// Never remove internal key from store as it is never persisted. // Never remove internal key from store as it is never persisted.
if (key instanceof GridCacheInternal) if (key instanceof GridCacheInternal)
return false; return false;


key = cctx.unwrapBinaryIfNeeded(key, !convertBinary()); Object key0 = cctx.unwrapBinaryIfNeeded(key, !convertBinary());


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Removing value from cache store [key=" + key + ']'); log.debug("Removing value from cache store [key=" + key0 + ']');


sessionInit0(tx); sessionInit0(tx);


boolean threwEx = true; boolean threwEx = true;


try { try {
store.delete(key); store.delete(key0);


threwEx = false; threwEx = false;
} }
Expand All @@ -664,7 +667,7 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx,
} }


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Removed value from cache store [key=" + key + ']'); log.debug("Removed value from cache store [key=" + key0 + ']');


return true; return true;
} }
Expand All @@ -673,12 +676,15 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx,
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) throws IgniteCheckedException { @Override public boolean removeAll(
@Nullable IgniteInternalTx tx,
Collection<? extends KeyCacheObject> keys
) throws IgniteCheckedException {
if (F.isEmpty(keys)) if (F.isEmpty(keys))
return true; return true;


if (keys.size() == 1) { if (keys.size() == 1) {
Object key = keys.iterator().next(); KeyCacheObject key = keys.iterator().next();


return remove(tx, key); return remove(tx, key);
} }
Expand Down Expand Up @@ -1024,7 +1030,7 @@ private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) {
*/ */
private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> { private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> {
/** */ /** */
private final Map<?, IgniteBiTuple<?, GridCacheVersion>> map; private final Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> map;


/** */ /** */
private Set<Object> rmvd; private Set<Object> rmvd;
Expand All @@ -1035,7 +1041,7 @@ private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> {
/** /**
* @param map Map. * @param map Map.
*/ */
private EntriesView(Map<?, IgniteBiTuple<?, GridCacheVersion>> map) { private EntriesView(Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> map) {
assert map != null; assert map != null;


this.map = map; this.map = map;
Expand Down

0 comments on commit 0d7025c

Please sign in to comment.