Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Jan 20, 2015
1 parent 1eb8ba3 commit 854664a
Show file tree
Hide file tree
Showing 15 changed files with 894 additions and 159 deletions.
Expand Up @@ -1379,11 +1379,10 @@ private boolean evictx(K key, GridCacheVersion ver,

return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null,
taskName, true).chain(new CX1<IgniteFuture<Map<K, V>>, V>() {
@Override
public V applyx(IgniteFuture<Map<K, V>> e) throws IgniteCheckedException {
return e.get().get(key);
}
});
@Override public V applyx(IgniteFuture<Map<K, V>> e) throws IgniteCheckedException {
return e.get().get(key);
}
});
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -3385,7 +3384,7 @@ public IgniteFuture<?> loadAll(final Set<? extends K> keys,
if (keys.size() < 10) {
for (K key : keys) {
if (key == null)
throw new NullPointerException();
throw new NullPointerException("Key to load is null.");
}
}

Expand All @@ -3394,85 +3393,38 @@ public IgniteFuture<?> loadAll(final Set<? extends K> keys,

if (replaceExisting) {
if (ctx.store().isLocalStore()) {
assert false;
Collection<ClusterNode> nodes = ctx.grid().forCache(name()).nodes();

return null;
return ctx.closures().callAsyncNoFailover(BROADCAST,
new LoadKeysCallable<>(ctx.name(), keys, true),
nodes,
true);
}
else {
return ctx.closures().callLocalSafe(new Callable<Void>() {
@Override public Void call() throws Exception {
loadAll(keys);
localLoadAndUpdate(keys);

return null;
}
});
}
}
else {
return ctx.closures().callLocalSafe(new Callable<Void>() {
@Override public Void call() throws Exception {
// Version for all loaded entries.
final GridCacheVersion ver0 = ctx.versions().nextForLoad();
final boolean replicate = ctx.isDrEnabled();
final long topVer = ctx.affinity().affinityTopologyVersion();

ctx.store().loadAllFromStore(null, keys, new CIX2<K, V>() {
@Override public void applyx(K key, V val)
throws PortableException {
if (ctx.portableEnabled()) {
key = (K)ctx.marshalToPortable(key);
val = (V)ctx.marshalToPortable(val);
}
Collection<ClusterNode> nodes = ctx.grid().forCache(name()).nodes();

GridCacheEntryEx<K, V> entry = entryEx(key, false);

try {
entry.initialValue(val, null, ver0, 0, -1, false, topVer, replicate ? DR_LOAD : DR_NONE);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to put cache value: " + entry, e);
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry during loadCache (will ignore): " + entry);
}
finally {
ctx.evicts().touch(entry, topVer);
}

CU.unwindEvicts(ctx);
}
});

return null;
}
});
}
}

/**
* @param keys Keys.
* @throws IgniteCheckedException If failed.
*/
private void loadAllLocalStore(final Set<? extends K> keys) throws IgniteCheckedException {
assert ctx.store().isLocalStore();

try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());

LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0);

ctx.store().loadAllFromLocalStore(null, keys, c);

c.onDone();
return ctx.closures().callAsyncNoFailover(BROADCAST,
new LoadKeysCallable<>(ctx.name(), keys, false),
nodes,
true);
}
}

/**
* @param keys Keys.
* @throws IgniteCheckedException If failed.
*/
private void loadAll(final Set<? extends K> keys) throws IgniteCheckedException {
private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException {
try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
final Collection<Map.Entry<K, V>> col = new ArrayList<>(ldr.perNodeBufferSize());

Expand All @@ -3498,6 +3450,37 @@ private void loadAll(final Set<? extends K> keys) throws IgniteCheckedException
}
}

/**
* @param keys Keys to load.
* @throws IgniteCheckedException If failed.
*/
public void localLoad(Collection<? extends K> keys) throws IgniteCheckedException {
final boolean replicate = ctx.isDrEnabled();
final long topVer = ctx.affinity().affinityTopologyVersion();

if (ctx.store().isLocalStore()) {
try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());

LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0);

ctx.store().localStoreLoadAll(null, keys, c);

c.onDone();
}
}
else {
// Version for all loaded entries.
final GridCacheVersion ver0 = ctx.versions().nextForLoad();

ctx.store().loadAllFromStore(null, keys, new CI2<K, V>() {
@Override public void apply(K key, V val) {
loadEntry(key, val, ver0, null, topVer, replicate, 0);
}
});
}
}

/** {@inheritDoc} */
@Override public void loadCache(final IgniteBiPredicate<K, V> p, final long ttl, Object[] args)
throws IgniteCheckedException {
Expand All @@ -3520,38 +3503,58 @@ private void loadAll(final Set<? extends K> keys) throws IgniteCheckedException
final GridCacheVersion ver0 = ctx.versions().nextForLoad();

ctx.store().loadCache(new CIX3<K, V, GridCacheVersion>() {
@Override public void applyx(K key, V val, @Nullable GridCacheVersion ver)
@Override
public void applyx(K key, V val, @Nullable GridCacheVersion ver)
throws PortableException {
assert ver == null;

if (p != null && !p.apply(key, val))
return;
loadEntry(key, val, ver0, p, topVer, replicate, ttl);
}
}, args);
}
}

if (ctx.portableEnabled()) {
key = (K)ctx.marshalToPortable(key);
val = (V)ctx.marshalToPortable(val);
}
/**
* @param key Key.
* @param val Value.
* @param ver Cache version.
* @param p Optional predicate.
* @param topVer Topology version.
* @param replicate Replication flag.
* @param ttl TTL.
*/
private void loadEntry(K key,
V val,
GridCacheVersion ver,
@Nullable IgniteBiPredicate<K, V> p,
long topVer,
boolean replicate,
long ttl) {
if (p != null && !p.apply(key, val))
return;

GridCacheEntryEx<K, V> entry = entryEx(key, false);
if (ctx.portableEnabled()) {
key = (K)ctx.marshalToPortable(key);
val = (V)ctx.marshalToPortable(val);
}

try {
entry.initialValue(val, null, ver0, ttl, -1, false, topVer, replicate ? DR_LOAD : DR_NONE);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to put cache value: " + entry, e);
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry during loadCache (will ignore): " + entry);
}
finally {
ctx.evicts().touch(entry, topVer);
}
GridCacheEntryEx<K, V> entry = entryEx(key, false);

CU.unwindEvicts(ctx);
}
}, args);
try {
entry.initialValue(val, null, ver, ttl, -1, false, topVer, replicate ? DR_LOAD : DR_NONE);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to put cache value: " + entry, e);
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry during loadCache (will ignore): " + entry);
}
finally {
ctx.evicts().touch(entry, topVer);
}

CU.unwindEvicts(ctx);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -5191,6 +5194,83 @@ public GetExpiryPolicy(long accessTtl) {
}
}

/**
*
*/
static class LoadKeysCallable<K, V> implements IgniteCallable<Void>, Externalizable{
/** */
private static final long serialVersionUID = 0L;

/** Cache name. */
private String cacheName;

/** Injected grid instance. */
@IgniteInstanceResource
private Ignite ignite;

/** Keys to load. */
private Collection<? extends K> keys;

/** Update flag. */
private boolean update;

/**
* Required by {@link Externalizable}.
*/
public LoadKeysCallable() {
// No-op.
}

/**
* @param cacheName Cache name.
* @param keys Keys.
* @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)}
* otherwise {@link #localLoad(Collection)}.
*/
LoadKeysCallable(String cacheName, Collection<? extends K> keys, boolean update) {
this.cacheName = cacheName;
this.keys = keys;
this.update = update;
}

/** {@inheritDoc} */
@Override public Void call() throws Exception {
GridCacheAdapter<K, V> cache = ((GridKernal)ignite).context().cache().internalCache(cacheName);

cache.context().gate().enter();

try {
if (update)
cache.localLoadAndUpdate(keys);
else
cache.localLoad(keys);
}
finally {
cache.context().gate().leave();
}

return null;
}

/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, cacheName);

U.writeCollection(out, keys);

out.writeBoolean(update);
}

/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
cacheName = U.readString(in);

keys = U.readCollection(in);

update = in.readBoolean();
}
}

/**
*
*/
Expand Down
Expand Up @@ -294,7 +294,7 @@ public boolean writeToStoreFromDht() {
* @param vis Closer to cache loaded elements.
* @throws IgniteCheckedException If data loading failed.
*/
public void loadAllFromLocalStore(@Nullable IgniteTx tx,
public void localStoreLoadAll(@Nullable IgniteTx tx,
Collection<? extends K> keys,
final GridInClosure3<K, V, GridCacheVersion> vis) throws IgniteCheckedException {
assert store != null;
Expand Down

0 comments on commit 854664a

Please sign in to comment.