Skip to content

Commit

Permalink
IGNITE-4036 - Fix. Near cache is not expired together with correspond…
Browse files Browse the repository at this point in the history
…ing server cache
  • Loading branch information
dkarachentsev committed Jan 26, 2017
1 parent 6f6ff39 commit f5e601e
Show file tree
Hide file tree
Showing 19 changed files with 411 additions and 152 deletions.
Expand Up @@ -24,7 +24,7 @@
*/
public class EntryGetResult {
/** */
private final CacheObject val;
private Object val;

/** */
private final GridCacheVersion ver;
Expand All @@ -35,18 +35,34 @@ public class EntryGetResult {
/**
* @param val Value.
* @param ver Version.
* @param reserved Reserved flag.
*/
EntryGetResult(CacheObject val, GridCacheVersion ver, boolean reserved) {
public EntryGetResult(Object val, GridCacheVersion ver, boolean reserved) {
this.val = val;
this.ver = ver;
this.reserved = reserved;
}

/**
* @param val Value.
* @param ver Version.
*/
public EntryGetResult(Object val, GridCacheVersion ver) {
this(val, ver, false);
}

/**
* @return Value.
*/
public CacheObject value() {
return val;
public <T> T value() {
return (T)val;
}

/**
* @param val Value.
*/
public void value(Object val) {
this.val = val;
}

/**
Expand All @@ -57,9 +73,23 @@ public GridCacheVersion version() {
}

/**
* @return Reserved flag,
* @return Reserved flag.
*/
public boolean reserved() {
return reserved;
}

/**
* @return Entry expire time.
*/
public long expireTime() {
return 0L;
}

/**
* @return Entry time to live.
*/
public long ttl() {
return 0L;
}
}
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;

/**
*
*/
public class EntryGetWithTtlResult extends EntryGetResult {
/** */
private final long expireTime;

/** */
private final long ttl;

/**
* @param val Value.
* @param ver Version.
* @param reserved Reserved flag.
* @param expireTime Entry expire time.
* @param ttl Entry time to live.
*/
public EntryGetWithTtlResult(Object val, GridCacheVersion ver, boolean reserved, long expireTime, long ttl) {
super(val, ver, reserved);
this.expireTime = expireTime;
this.ttl = ttl;
}

/**
* @return Entry expire time.
*/
@Override public long expireTime() {
return expireTime;
}

/**
* @return Entry time to live.
*/
@Override public long ttl() {
return ttl;
}
}
Expand Up @@ -1421,20 +1421,21 @@ public V getTopologySafe(K key) throws IgniteCheckedException {
if (keepBinary)
key = (K)ctx.toCacheKeyObject(key);

T2<V, GridCacheVersion> t = (T2<V, GridCacheVersion>)get(key, !keepBinary, true);
EntryGetResult t
= (EntryGetResult)get(key, !keepBinary, true);

CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(
keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key, true, false) : key,
t.get1(),
t.get2())
(V)t.value(),
t.version())
: null;

if (ctx.config().getInterceptor() != null) {
key = keepBinary ? (K) ctx.unwrapBinaryIfNeeded(key, true, false) : key;

V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);

val = (val0 != null) ? new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null) : null;
val = (val0 != null) ? new CacheEntryImplEx<>(key, val0, t != null ? t.version() : null) : null;
}

if (statsEnabled)
Expand Down Expand Up @@ -1484,37 +1485,37 @@ public V getTopologySafe(K key) throws IgniteCheckedException {

final K key0 = keepBinary ? (K)ctx.toCacheKeyObject(key) : key;

IgniteInternalFuture<T2<V, GridCacheVersion>> fut =
(IgniteInternalFuture<T2<V, GridCacheVersion>>)getAsync(key0, !keepBinary, true);
IgniteInternalFuture<EntryGetResult> fut =
(IgniteInternalFuture<EntryGetResult>)getAsync(key0, !keepBinary, true);

final boolean intercept = ctx.config().getInterceptor() != null;

IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain(
new CX1<IgniteInternalFuture<T2<V, GridCacheVersion>>, CacheEntry<K, V>>() {
@Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f)
new CX1<IgniteInternalFuture<EntryGetResult>, CacheEntry<K, V>>() {
@Override public CacheEntry<K, V> applyx(IgniteInternalFuture<EntryGetResult> f)
throws IgniteCheckedException {
T2<V, GridCacheVersion> t = f.get();
EntryGetResult t = f.get();

K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0;

CacheEntry val = t != null ? new CacheEntryImplEx<>(
key,
t.get1(),
t.get2())
t.value(),
t.version())
: null;

if (intercept) {
V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);

return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.get2() : null) : null;
return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.version() : null) : null;
}
else
return val;
}
});

if (statsEnabled)
fut.listen(new UpdateGetTimeStatClosure<T2<V, GridCacheVersion>>(metrics0(), start));
fut.listen(new UpdateGetTimeStatClosure<EntryGetResult>(metrics0(), start));

return fr;
}
Expand Down Expand Up @@ -1547,15 +1548,15 @@ public V getTopologySafe(K key) throws IgniteCheckedException {

long start = statsEnabled ? System.nanoTime() : 0L;

Map<K, T2<V, GridCacheVersion>> map = (Map<K, T2<V, GridCacheVersion>>)getAll(keys, !ctx.keepBinary(), true);
Map<K, EntryGetResult> map = (Map<K, EntryGetResult>)getAll(keys, !ctx.keepBinary(), true);

Collection<CacheEntry<K, V>> res = new HashSet<>();

if (ctx.config().getInterceptor() != null)
res = interceptGetEntries(keys, map);
else
for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet())
res.add(new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2()));
for (Map.Entry<K, EntryGetResult> e : map.entrySet())
res.add(new CacheEntryImplEx<>(e.getKey(), (V)e.getValue().value(), e.getValue().version()));

if (statsEnabled)
metrics0().addGetTimeNanos(System.nanoTime() - start);
Expand Down Expand Up @@ -1595,32 +1596,32 @@ public V getTopologySafe(K key) throws IgniteCheckedException {

final long start = statsEnabled ? System.nanoTime() : 0L;

IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> fut =
(IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>)
IgniteInternalFuture<Map<K, EntryGetResult>> fut =
(IgniteInternalFuture<Map<K, EntryGetResult>>)
((IgniteInternalFuture)getAllAsync(keys, !ctx.keepBinary(), true));

final boolean intercept = ctx.config().getInterceptor() != null;

IgniteInternalFuture<Collection<CacheEntry<K, V>>> rf =
fut.chain(new CX1<IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>, Collection<CacheEntry<K, V>>>() {
fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryGetResult>>, Collection<CacheEntry<K, V>>>() {
@Override public Collection<CacheEntry<K, V>> applyx(
IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> f) throws IgniteCheckedException {
IgniteInternalFuture<Map<K, EntryGetResult>> f) throws IgniteCheckedException {
if (intercept)
return interceptGetEntries(keys, f.get());
else {
Map<K, CacheEntry<K, V>> res = U.newHashMap(f.get().size());

for (Map.Entry<K, T2<V, GridCacheVersion>> e : f.get().entrySet())
for (Map.Entry<K, EntryGetResult> e : f.get().entrySet())
res.put(e.getKey(),
new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2()));
new CacheEntryImplEx<>(e.getKey(), (V)e.getValue().value(), e.getValue().version()));

return res.values();
}
}
});

if (statsEnabled)
fut.listen(new UpdateGetTimeStatClosure<Map<K, T2<V, GridCacheVersion>>>(metrics0(), start));
fut.listen(new UpdateGetTimeStatClosure<Map<K, EntryGetResult>>(metrics0(), start));

return rf;
}
Expand Down Expand Up @@ -1675,7 +1676,7 @@ private Map<K, V> interceptGet(@Nullable Collection<? extends K> keys, Map<K, V>
*/
@SuppressWarnings("IfMayBeConditional")
private Collection<CacheEntry<K, V>> interceptGetEntries(
@Nullable Collection<? extends K> keys, Map<K, T2<V, GridCacheVersion>> map) {
@Nullable Collection<? extends K> keys, Map<K, EntryGetResult> map) {
Map<K, CacheEntry<K, V>> res;

if (F.isEmpty(keys)) {
Expand All @@ -1690,11 +1691,11 @@ private Collection<CacheEntry<K, V>> interceptGetEntries(

assert interceptor != null;

for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet()) {
V val = interceptor.onGet(e.getKey(), e.getValue().get1());
for (Map.Entry<K, EntryGetResult> e : map.entrySet()) {
V val = interceptor.onGet(e.getKey(), (V)e.getValue().value());

if (val != null)
res.put(e.getKey(), new CacheEntryImplEx<>(e.getKey(), val, e.getValue().get2()));
res.put(e.getKey(), new CacheEntryImplEx<>(e.getKey(), val, e.getValue().version()));
}

if (map.size() != keys.size()) { // Not all requested keys were in cache.
Expand Down Expand Up @@ -1976,12 +1977,12 @@ protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
if (res != null) {
ctx.addResult(map,
key,
res.value(),
res,
skipVals,
keepCacheObjects,
deserializeBinary,
true,
needVer ? res.version() : null);
needVer);

if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
ctx.evicts().touch(entry, topVer);
Expand Down Expand Up @@ -2025,7 +2026,7 @@ protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
GridCacheEntryEx entry = entryEx(key);

try {
T2<CacheObject, GridCacheVersion> verVal = entry.versionedValue(
EntryGetResult verVal = entry.versionedValue(
cacheVal,
res.version(),
null,
Expand All @@ -2035,19 +2036,19 @@ protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
if (log.isDebugEnabled())
log.debug("Set value loaded from store into entry [" +
"oldVer=" + res.version() +
", newVer=" + verVal.get2() + ", " +
", newVer=" + verVal.version() + ", " +
"entry=" + entry + ']');

// Don't put key-value pair into result map if value is null.
if (verVal.get1() != null) {
if (verVal.value() != null) {
ctx.addResult(map,
key,
verVal.get1(),
verVal,
skipVals,
keepCacheObjects,
deserializeBinary,
true,
needVer ? verVal.get2() : null);
needVer);
}

if (tx0 == null || (!tx0.implicit() &&
Expand Down

0 comments on commit f5e601e

Please sign in to comment.