Skip to content

Commit

Permalink
ignite-4525 - Near reader is created when value is loaded from store.
Browse files Browse the repository at this point in the history
  • Loading branch information
dkarachentsev committed Jan 18, 2017
1 parent d0c0bce commit b54a481
Show file tree
Hide file tree
Showing 19 changed files with 770 additions and 163 deletions.
Expand Up @@ -1789,6 +1789,7 @@ protected IgniteInternalFuture<Map<K, V>> getAllAsync(
subjId = ctx.subjectIdPerCall(subjId, opCtx); subjId = ctx.subjectIdPerCall(subjId, opCtx);


return getAllAsync(keys, return getAllAsync(keys,
null,
opCtx == null || !opCtx.skipStore(), opCtx == null || !opCtx.skipStore(),
!skipTx, !skipTx,
subjId, subjId,
Expand All @@ -1803,6 +1804,7 @@ protected IgniteInternalFuture<Map<K, V>> getAllAsync(


/** /**
* @param keys Keys. * @param keys Keys.
* @param readerArgs Near cache reader will be added if not null.
* @param readThrough Read through. * @param readThrough Read through.
* @param checkTx Check tx. * @param checkTx Check tx.
* @param subjId Subj Id. * @param subjId Subj Id.
Expand All @@ -1817,6 +1819,7 @@ protected IgniteInternalFuture<Map<K, V>> getAllAsync(
* @see GridCacheAdapter#getAllAsync(Collection) * @see GridCacheAdapter#getAllAsync(Collection)
*/ */
public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys, public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
@Nullable final ReaderArguments readerArgs,
boolean readThrough, boolean readThrough,
boolean checkTx, boolean checkTx,
@Nullable final UUID subjId, @Nullable final UUID subjId,
Expand All @@ -1834,6 +1837,7 @@ public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collect
validateCacheKeys(keys); validateCacheKeys(keys);


return getAllAsync0(ctx.cacheKeysView(keys), return getAllAsync0(ctx.cacheKeysView(keys),
readerArgs,
readThrough, readThrough,
checkTx, checkTx,
subjId, subjId,
Expand All @@ -1848,6 +1852,7 @@ public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collect


/** /**
* @param keys Keys. * @param keys Keys.
* @param readerArgs Near cache reader will be added if not null.
* @param readThrough Read-through flag. * @param readThrough Read-through flag.
* @param checkTx Check local transaction flag. * @param checkTx Check local transaction flag.
* @param subjId Subject ID. * @param subjId Subject ID.
Expand All @@ -1862,6 +1867,7 @@ public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collect
*/ */
protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0( protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
@Nullable final Collection<KeyCacheObject> keys, @Nullable final Collection<KeyCacheObject> keys,
@Nullable final ReaderArguments readerArgs,
final boolean readThrough, final boolean readThrough,
boolean checkTx, boolean checkTx,
@Nullable final UUID subjId, @Nullable final UUID subjId,
Expand Down Expand Up @@ -1932,7 +1938,8 @@ protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
subjId, subjId,
taskName, taskName,
expiry, expiry,
!deserializeBinary); !deserializeBinary,
readerArgs);


assert res != null; assert res != null;


Expand All @@ -1957,7 +1964,8 @@ protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
null, null,
taskName, taskName,
expiry, expiry,
!deserializeBinary); !deserializeBinary,
readerArgs);


if (res == null) if (res == null)
ctx.evicts().touch(entry, topVer); ctx.evicts().touch(entry, topVer);
Expand Down Expand Up @@ -2015,29 +2023,28 @@ protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
GridCacheEntryEx entry = entryEx(key); GridCacheEntryEx entry = entryEx(key);


try { try {
GridCacheVersion verSet = entry.versionedValue(cacheVal, T2<CacheObject, GridCacheVersion> verVal = entry.versionedValue(
cacheVal,
res.version(), res.version(),
null); null,

readerArgs);
boolean set = verSet != null;


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Set value loaded from store into entry [" + log.debug("Set value loaded from store into entry [" +
"set=" + set + "oldVer=" + res.version() +
", curVer=" + res.version() + ", newVer=" + verVal.get2() + ", " +
", newVer=" + verSet + ", " +
"entry=" + entry + ']'); "entry=" + entry + ']');


// Don't put key-value pair into result map if value is null. // Don't put key-value pair into result map if value is null.
if (val != null) { if (verVal.get1() != null) {
ctx.addResult(map, ctx.addResult(map,
key, key,
cacheVal, verVal.get1(),
skipVals, skipVals,
keepCacheObjects, keepCacheObjects,
deserializeBinary, deserializeBinary,
false, true,
needVer ? set ? verSet : res.version() : null); needVer ? verVal.get2() : null);
} }


if (tx0 == null || (!tx0.implicit() && if (tx0 == null || (!tx0.implicit() &&
Expand Down
Expand Up @@ -1900,9 +1900,9 @@ public <K1, V1> void addResult(Map<K1, V1> map,
assert val != null || skipVals; assert val != null || skipVals;


if (!keepCacheObjects) { if (!keepCacheObjects) {
Object key0 = unwrapBinaryIfNeeded(key, !deserializeBinary); Object key0 = unwrapBinaryIfNeeded(key, !deserializeBinary, cpy);


Object val0 = skipVals ? true : unwrapBinaryIfNeeded(val, !deserializeBinary); Object val0 = skipVals ? true : unwrapBinaryIfNeeded(val, !deserializeBinary, cpy);


assert key0 != null : key; assert key0 != null : key;
assert val0 != null : val; assert val0 != null : val;
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.typedef.T2;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


/** /**
Expand Down Expand Up @@ -318,11 +319,12 @@ public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
* @param taskName Task name. * @param taskName Task name.
* @param expiryPlc Expiry policy. * @param expiryPlc Expiry policy.
* @param keepBinary Keep binary flag. * @param keepBinary Keep binary flag.
* @param readerArgs Reader will be added if not null.
* @return Cached value and entry version. * @return Cached value and entry version.
* @throws IgniteCheckedException If loading value failed. * @throws IgniteCheckedException If loading value failed.
* @throws GridCacheEntryRemovedException If entry was removed. * @throws GridCacheEntryRemovedException If entry was removed.
*/ */
@Nullable public EntryGetResult innerGetVersioned( public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver, @Nullable GridCacheVersion ver,
IgniteInternalTx tx, IgniteInternalTx tx,
boolean readSwap, boolean readSwap,
Expand All @@ -333,7 +335,8 @@ public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
Object transformClo, Object transformClo,
String taskName, String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc, @Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary) boolean keepBinary,
@Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException; throws IgniteCheckedException, GridCacheEntryRemovedException;


/** /**
Expand All @@ -344,7 +347,7 @@ public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
* @param taskName Task name. * @param taskName Task name.
* @param expiryPlc Expiry policy. * @param expiryPlc Expiry policy.
* @param keepBinary Keep binary flag. * @param keepBinary Keep binary flag.
* @return Cached value and entry version. * @param readerArgs Reader will be added if not null.
* @throws IgniteCheckedException If loading value failed. * @throws IgniteCheckedException If loading value failed.
* @throws GridCacheEntryRemovedException If entry was removed. * @throws GridCacheEntryRemovedException If entry was removed.
* @return Cached value, entry version and flag indicating if entry was reserved. * @return Cached value, entry version and flag indicating if entry was reserved.
Expand All @@ -355,7 +358,8 @@ public EntryGetResult innerGetAndReserveForLoad(boolean readSwap,
UUID subjId, UUID subjId,
String taskName, String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc, @Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException; boolean keepBinary,
@Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException;


/** /**
* @param ver Expected entry version. * @param ver Expected entry version.
Expand Down Expand Up @@ -751,13 +755,15 @@ public <K, V> GridCacheVersionedEntryEx<K, V> versionedEntry(final boolean keepB
* @param val New value. * @param val New value.
* @param curVer Version to match or {@code null} if match is not required. * @param curVer Version to match or {@code null} if match is not required.
* @param newVer Version to set. * @param newVer Version to set.
* @return Non null version if value was set. * @param readerArgs Reader will be added if not null.
* @return Current version and value.
* @throws IgniteCheckedException If index could not be updated. * @throws IgniteCheckedException If index could not be updated.
* @throws GridCacheEntryRemovedException If entry was removed. * @throws GridCacheEntryRemovedException If entry was removed.
*/ */
public GridCacheVersion versionedValue(CacheObject val, public T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val,
@Nullable GridCacheVersion curVer, @Nullable GridCacheVersion curVer,
@Nullable GridCacheVersion newVer) @Nullable GridCacheVersion newVer,
@Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException; throws IgniteCheckedException, GridCacheEntryRemovedException;


/** /**
Expand Down
Expand Up @@ -778,7 +778,8 @@ protected final void releaseSwap() throws IgniteCheckedException {
expirePlc, expirePlc,
false, false,
keepBinary, keepBinary,
false); false,
null);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand All @@ -788,7 +789,8 @@ protected final void releaseSwap() throws IgniteCheckedException {
UUID subjId, UUID subjId,
String taskName, String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc, @Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException { boolean keepBinary,
@Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException {
return (EntryGetResult)innerGet0( return (EntryGetResult)innerGet0(
/*ver*/null, /*ver*/null,
/*tx*/null, /*tx*/null,
Expand All @@ -803,11 +805,12 @@ protected final void releaseSwap() throws IgniteCheckedException {
expiryPlc, expiryPlc,
true, true,
keepBinary, keepBinary,
/*reserve*/true); /*reserve*/true,
readerArgs);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public EntryGetResult innerGetVersioned( @Override public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver, @Nullable GridCacheVersion ver,
IgniteInternalTx tx, IgniteInternalTx tx,
boolean readSwap, boolean readSwap,
Expand All @@ -818,7 +821,8 @@ protected final void releaseSwap() throws IgniteCheckedException {
Object transformClo, Object transformClo,
String taskName, String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc, @Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary) boolean keepBinary,
@Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException { throws IgniteCheckedException, GridCacheEntryRemovedException {
return (EntryGetResult)innerGet0(ver, return (EntryGetResult)innerGet0(ver,
tx, tx,
Expand All @@ -833,7 +837,8 @@ protected final void releaseSwap() throws IgniteCheckedException {
expiryPlc, expiryPlc,
true, true,
keepBinary, keepBinary,
false); false,
readerArgs);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand All @@ -852,7 +857,8 @@ private Object innerGet0(
@Nullable IgniteCacheExpiryPolicy expiryPlc, @Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean retVer, boolean retVer,
boolean keepBinary, boolean keepBinary,
boolean reserveForLoad boolean reserveForLoad,
@Nullable ReaderArguments readerArgs
) throws IgniteCheckedException, GridCacheEntryRemovedException { ) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert !(retVer && readThrough); assert !(retVer && readThrough);
assert !(reserveForLoad && readThrough); assert !(reserveForLoad && readThrough);
Expand Down Expand Up @@ -961,6 +967,8 @@ private Object innerGet0(
// Cache version for optimistic check. // Cache version for optimistic check.
startVer = ver; startVer = ver;


addReaderIfNeed(readerArgs);

if (ret != null) { if (ret != null) {
assert tmp || !(ret instanceof BinaryObjectOffheapImpl); assert tmp || !(ret instanceof BinaryObjectOffheapImpl);
assert !obsolete; assert !obsolete;
Expand Down Expand Up @@ -1051,6 +1059,8 @@ else if (tx.dht()) {


if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
deletedUnlocked(false); deletedUnlocked(false);

assert readerArgs == null;
} }


if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
Expand Down Expand Up @@ -3611,19 +3621,22 @@ private long nextPartCounter(AffinityTopologyVersion topVer) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public synchronized GridCacheVersion versionedValue(CacheObject val, @Override public synchronized T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val,
GridCacheVersion curVer, GridCacheVersion curVer,
GridCacheVersion newVer) GridCacheVersion newVer,
@Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException throws IgniteCheckedException, GridCacheEntryRemovedException
{ {
checkObsolete(); checkObsolete();


addReaderIfNeed(readerArgs);

if (curVer == null || curVer.equals(ver)) { if (curVer == null || curVer.equals(ver)) {
if (val != this.val) { if (val != this.val) {
GridCacheMvcc mvcc = mvccExtras(); GridCacheMvcc mvcc = mvccExtras();


if (mvcc != null && !mvcc.isEmpty()) if (mvcc != null && !mvcc.isEmpty())
return null; return new T2<>(this.val, ver);


if (newVer == null) if (newVer == null)
newVer = cctx.versions().next(); newVer = cctx.versions().next();
Expand All @@ -3647,13 +3660,32 @@ private long nextPartCounter(AffinityTopologyVersion topVer) {
// Version does not change for load ops. // Version does not change for load ops.
update(val, expTime, ttl, newVer, true); update(val, expTime, ttl, newVer, true);


return newVer; return new T2<>(val, newVer);
} }


assert !evictionDisabled() : this; assert !evictionDisabled() : this;
} }


return null; return new T2<>(this.val, ver);
}

/**
* @param readerArgs Reader arguments
*/
private void addReaderIfNeed(@Nullable ReaderArguments readerArgs) {
if (readerArgs != null) {
assert this instanceof GridDhtCacheEntry : this;
assert Thread.holdsLock(this);

try {
((GridDhtCacheEntry)this).addReader(readerArgs.reader(),
readerArgs.messageId(),
readerArgs.topologyVersion());
}
catch (GridCacheEntryRemovedException e) {
assert false : this;
}
}
} }


/** /**
Expand Down

0 comments on commit b54a481

Please sign in to comment.