Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Mar 9, 2015
1 parent 1e4ce34 commit 52c6106
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 459 deletions.
Expand Up @@ -380,50 +380,6 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> {
*/ */
public IgniteInternalFuture<V> reloadAsync(K key); public IgniteInternalFuture<V> reloadAsync(K key);


/**
* Reloads all currently cached keys form persistent storage.
* <h2 class="header">Transactions</h2>
* This method does not participate in transactions, however it does not violate
* cache integrity and can be used safely with or without transactions.
*
* @throws IgniteCheckedException If reloading failed.
*/
public void reloadAll() throws IgniteCheckedException;

/**
* Asynchronously reloads all specified entries from underlying
* persistent storage.
* <h2 class="header">Transactions</h2>
* This method does not participate in transactions, however it does not violate
* cache integrity and can be used safely with or without transactions.
*
* @return Future which will complete whenever {@code reload} completes.
*/
public IgniteInternalFuture<?> reloadAllAsync();

/**
* Reloads specified entries from underlying persistent storage.
* <h2 class="header">Transactions</h2>
* This method does not participate in transactions, however it does not violate
* cache integrity and can be used safely with or without transactions.
*
* @param keys Keys to reload.
* @throws IgniteCheckedException if reloading failed.
*/
public void reloadAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException;

/**
* Asynchronously reloads all specified entries from underlying
* persistent storage.
* <h2 class="header">Transactions</h2>
* This method does not participate in transactions, however it does not violate
* cache integrity and can be used safely with or without transactions.
*
* @param keys Keys to reload.
* @return Future which will complete whenever {@code reload} completes.
*/
public IgniteInternalFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys);

/** /**
* Peeks at in-memory cached value using default {@link GridCachePeekMode#SMART} * Peeks at in-memory cached value using default {@link GridCachePeekMode#SMART}
* peek mode. * peek mode.
Expand Down
Expand Up @@ -1022,7 +1022,7 @@ else if (m == DB) {
} }


if (val != null) if (val != null)
return F.t((V)val.get().value(ctx.cacheObjectContext(), true)); return F.t(CU.<V>value(val.get(), ctx, true));
} }
} }
catch (GridCacheEntryRemovedException ignore) { catch (GridCacheEntryRemovedException ignore) {
Expand Down Expand Up @@ -1567,25 +1567,6 @@ public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException
return getAllAsync(keys, !ctx.config().isReadFromBackup(), /*skip tx*/true, null, null, taskName, true, false); return getAllAsync(keys, !ctx.config().isReadFromBackup(), /*skip tx*/true, null, null, taskName, true, false);
} }


/** {@inheritDoc} */
@Override public void reloadAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException {
reloadAll(keys, false, false);
}

/** {@inheritDoc} */
@Override public void reloadAll() throws IgniteCheckedException {
ctx.denyOnFlags(F.asList(LOCAL, READ));

reloadAll(keySet());
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> reloadAllAsync() {
ctx.denyOnFlags(F.asList(LOCAL, READ));

return reloadAllAsync(keySet());
}

/** /**
* @param keys Keys. * @param keys Keys.
* @param reload Reload flag. * @param reload Reload flag.
Expand Down Expand Up @@ -1616,190 +1597,6 @@ public IgniteInternalFuture<Object> readThroughAllAsync(final Collection<KeyCach
}, true); }, true);
} }


/**
* @param keys Keys.
* @param ret Return flag.
* @return Non-{@code null} map if return flag is {@code true}.
* @throws IgniteCheckedException If failed.
*/
@Nullable public Map<K, V> reloadAll(@Nullable Collection<? extends K> keys, boolean ret, boolean skipVals)
throws IgniteCheckedException {
UUID subjId = ctx.subjectIdPerCall(null);

String taskName = ctx.kernalContext().job().currentTaskName();

return reloadAllAsync(keys, ret, skipVals, subjId, taskName).get();
}

/**
* @param keys Keys.
* @param ret Return flag.
* @return Future.
*/
public IgniteInternalFuture<Map<K, V>> reloadAllAsync(@Nullable Collection<? extends K> keys,
boolean ret,
boolean skipVals,
@Nullable UUID subjId,
String taskName)
{
ctx.denyOnFlag(READ);

final long topVer = ctx.affinity().affinityTopologyVersion();

// TODO IGNITE-51.
List<KeyCacheObject> cacheKeys = new ArrayList<>(keys.size());

if (!F.isEmpty(keys)) {
final String uid = CU.uuid(); // Get meta UUID for this thread.

assert keys != null;

if (keyCheck)
validateCacheKeys(keys);

for (K key : keys) {
if (key == null)
continue;

KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);

cacheKeys.add(cacheKey);

// Skip primary or backup entries for near cache.
if (ctx.isNear() && ctx.affinity().localNode(cacheKey, topVer))
continue;

while (true) {
try {
GridCacheEntryEx entry = entryExSafe(cacheKey, topVer);

if (entry == null)
break;

// Get version before checking filer.
GridCacheVersion ver = entry.version();

// Tag entry with current version.
entry.addMeta(uid, ver);

break;
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry for reload (will retry): " + key);
}
catch (GridDhtInvalidPartitionException ignore) {
if (log.isDebugEnabled())
log.debug("Got invalid partition for key (will skip): " + key);

break;
}
}
}

final Map<K, V> map = ret ? new HashMap<K, V>(keys.size(), 1.0f) : null;

final Collection<KeyCacheObject> absentKeys = F.view(cacheKeys, CU.keyHasMeta(ctx, uid));

final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>();

IgniteInternalFuture<Object> readFut =
readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2<KeyCacheObject, Object>() {
/** Version for all loaded entries. */
private GridCacheVersion nextVer = ctx.versions().next();

/** {@inheritDoc} */
@Override public void apply(KeyCacheObject key, Object val) {
loadedKeys.add(key);

GridCacheEntryEx entry = peekEx(key);

if (entry != null) {
try {
GridCacheVersion curVer = entry.removeMeta(uid);

// If entry passed the filter.
if (curVer != null) {
boolean wasNew = entry.isNewLocked();

entry.unswap();

CacheObject cacheVal = ctx.toCacheObject(val);

boolean set = entry.versionedValue(cacheVal, curVer, nextVer);

ctx.evicts().touch(entry, topVer);

if (map != null) {
if (set || wasNew)
map.put(key.<K>value(ctx.cacheObjectContext(), false), (V)val);
else {
try {
// TODO IGNITE-51.
K k = key.<K>value(ctx.cacheObjectContext(), false);

GridTuple<V> v = peek0(false, k, GLOBAL);

if (v != null)
map.put(k, v.get());
}
catch (GridCacheFilterFailedException ex) {
ex.printStackTrace();

assert false;
}
}
}

if (log.isDebugEnabled()) {
log.debug("Set value loaded from store into entry [set=" + set + ", " +
"curVer=" +
curVer + ", newVer=" + nextVer + ", entry=" + entry + ']');
}
}
else {
if (log.isDebugEnabled()) {
log.debug("Current version was not found (either entry was removed or " +
"validation was not passed: " + entry);
}
}
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled()) {
log.debug("Got removed entry for reload (will not store reloaded entry) " +
"[entry=" + entry + ']');
}
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
}
});

return readFut.chain(new CX1<IgniteInternalFuture<Object>, Map<K, V>>() {
@Override public Map<K, V> applyx(IgniteInternalFuture<Object> e) throws IgniteCheckedException {
// Touch all not loaded keys.
for (KeyCacheObject key : absentKeys) {
if (!loadedKeys.contains(key)) {
GridCacheEntryEx entry = peekEx(key);

if (entry != null)
ctx.evicts().touch(entry, topVer);
}
}

// Make sure there were no exceptions.
e.get();

return map;
}
});
}

return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
}

/** /**
* @param keys Keys. * @param keys Keys.
* @param ret Return flag. * @param ret Return flag.
Expand Down Expand Up @@ -5207,20 +5004,6 @@ public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializePortabl
} }
} }


/**
* @param keys Keys.
* @return Reload future.
*/
@Override public IgniteInternalFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys) {
UUID subjId = ctx.subjectIdPerCall(null);

String taskName = ctx.kernalContext().job().currentTaskName();



return reloadAllAsync(keys, false, false, subjId, taskName);
}

/** /**
* @param key Key. * @param key Key.
* @return Reload future. * @return Reload future.
Expand Down
Expand Up @@ -479,26 +479,6 @@ boolean isAll(K k, V v) {
return cache.reloadAsync(key); return cache.reloadAsync(key);
} }


/** {@inheritDoc} */
@Override public void reloadAll() throws IgniteCheckedException {
cache.reloadAll();
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> reloadAllAsync() {
return cache.reloadAllAsync();
}

/** {@inheritDoc} */
@Override public void reloadAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException {
cache.reloadAll(keys);
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys) {
return cache.reloadAllAsync(keys);
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public V get(K key) throws IgniteCheckedException { @Override public V get(K key) throws IgniteCheckedException {
return cache.get(key, deserializePortables()); return cache.get(key, deserializePortables());
Expand Down
Expand Up @@ -413,54 +413,6 @@ public GridCacheProjectionImpl<K, V> gateProjection() {
} }
} }


/** {@inheritDoc} */
@Override public void reloadAll() throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

try {
delegate.reloadAll();
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> reloadAllAsync() {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

try {
return delegate.reloadAllAsync();
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public void reloadAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

try {
delegate.reloadAll(keys);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

try {
return delegate.reloadAllAsync(keys);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public V get(K key) throws IgniteCheckedException { @Nullable @Override public V get(K key) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj); GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
Expand Down

0 comments on commit 52c6106

Please sign in to comment.