Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Feb 11, 2015
1 parent 332ac70 commit 30a6783
Show file tree
Hide file tree
Showing 19 changed files with 909 additions and 123 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.compute.*;
import org.apache.ignite.internal.processors.cache.affinity.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.dr.*;
import org.apache.ignite.internal.processors.cache.query.*;
Expand Down Expand Up @@ -3816,13 +3817,15 @@ protected void checkJta() throws IgniteCheckedException {
final boolean replicate = ctx.isDrEnabled();
final long topVer = ctx.affinity().affinityTopologyVersion();

final ExpiryPolicy plc = ctx.expiry();

if (ctx.store().isLocalStore()) {
IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);

try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());

LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, ttl);
LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc);

ctx.store().loadCache(c, args);

Expand All @@ -3841,6 +3844,17 @@ protected void checkJta() throws IgniteCheckedException {
throws IgniteException {
assert ver == null;

long ttl = 0;

if (plc != null) {
ttl = CU.toTtl(plc.getExpiryForCreation());

if (ttl == CU.TTL_ZERO)
return;
else if (ttl == CU.TTL_NOT_CHANGED)
ttl = 0;
}

loadEntry(key, val, ver0, p, topVer, replicate, ttl);
}
}, args);
Expand Down Expand Up @@ -3916,6 +3930,10 @@ public IgniteInternalFuture<?> loadAll(
if (!ctx.store().configured())
return new GridFinishedFuture<>(ctx.kernalContext());

GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();

ExpiryPolicy plc = prj != null ? prj.expiry() : null;

if (replaceExisting) {
if (ctx.store().isLocalStore()) {
Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).nodes();
Expand All @@ -3924,7 +3942,7 @@ public IgniteInternalFuture<?> loadAll(
return new GridFinishedFuture<>(ctx.kernalContext());

return ctx.closures().callAsyncNoFailover(BROADCAST,
new LoadKeysCallable<>(ctx.name(), keys, true),
new LoadKeysCallable<>(ctx.name(), keys, true, plc),
nodes,
true);
}
Expand All @@ -3945,7 +3963,7 @@ public IgniteInternalFuture<?> loadAll(
return new GridFinishedFuture<>(ctx.kernalContext());

return ctx.closures().callAsyncNoFailover(BROADCAST,
new LoadKeysCallable<>(ctx.name(), keys, false),
new LoadKeysCallable<>(ctx.name(), keys, false, plc),
nodes,
true);
}
Expand Down Expand Up @@ -3985,19 +4003,25 @@ private void localLoadAndUpdate(final Collection<? extends K> keys) throws Ignit

/**
* @param keys Keys to load.
* @param plc Optional expiry policy.
* @throws IgniteCheckedException If failed.
*/
public void localLoad(Collection<? extends K> keys) throws IgniteCheckedException {
public void localLoad(Collection<? extends K> keys,
@Nullable ExpiryPolicy plc)
throws IgniteCheckedException
{
final boolean replicate = ctx.isDrEnabled();
final long topVer = ctx.affinity().affinityTopologyVersion();

final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();

if (ctx.store().isLocalStore()) {
IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);

try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());

LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0);
LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0);

ctx.store().localStoreLoadAll(null, keys, c);

Expand All @@ -4013,7 +4037,18 @@ public void localLoad(Collection<? extends K> keys) throws IgniteCheckedExceptio

ctx.store().loadAllFromStore(null, keys, new CI2<K, V>() {
@Override public void apply(K key, V val) {
loadEntry(key, val, ver0, null, topVer, replicate, 0);
long ttl = 0;

if (plc0 != null) {
ttl = CU.toTtl(plc0.getExpiryForCreation());

if (ttl == CU.TTL_ZERO)
return;
else if (ttl == CU.TTL_NOT_CHANGED)
ttl = 0;
}

loadEntry(key, val, ver0, null, topVer, replicate, ttl);
}
});
}
Expand Down Expand Up @@ -5923,10 +5958,7 @@ public void future(@Nullable IgniteInternalFuture fut) {
/**
*
*/
protected static class GetExpiryPolicy implements IgniteCacheExpiryPolicy {
/** */
private final long accessTtl;

protected abstract static class GetExpiryPolicy implements IgniteCacheExpiryPolicy {
/** */
private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries;

Expand All @@ -5937,30 +5969,30 @@ protected static class GetExpiryPolicy implements IgniteCacheExpiryPolicy {
* @param expiryPlc Expiry policy.
* @return Access expire policy.
*/
public static GetExpiryPolicy forPolicy(@Nullable ExpiryPolicy expiryPlc) {
@Nullable public static GetExpiryPolicy forPolicy(@Nullable final ExpiryPolicy expiryPlc) {
if (expiryPlc == null)
return null;

Duration duration = expiryPlc.getExpiryForAccess();

if (duration == null)
return null;

return new GetExpiryPolicy(CU.toTtl(duration));
return new GetExpiryPolicy() {
@Override public long forAccess() {
return CU.toTtl(expiryPlc.getExpiryForAccess());
}
};
}

/**
* @param accessTtl TTL for access.
* @param ttl Access TTL.
* @return Access expire policy.
*/
public GetExpiryPolicy(long accessTtl) {
assert accessTtl >= 0 : accessTtl;

this.accessTtl = accessTtl;
}
@Nullable public static GetExpiryPolicy forTtl(final long ttl) {
if (ttl == CU.TTL_NOT_CHANGED)
return null;

/** {@inheritDoc} */
@Override public long forAccess() {
return accessTtl;
return new GetExpiryPolicy() {
@Override public long forAccess() {
return ttl;
}
};
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -6052,6 +6084,9 @@ static class LoadKeysCallable<K, V> implements IgniteCallable<Void>, Externaliza
/** Update flag. */
private boolean update;

/** */
private ExpiryPolicy plc;

/**
* Required by {@link Externalizable}.
*/
Expand All @@ -6063,12 +6098,16 @@ public LoadKeysCallable() {
* @param cacheName Cache name.
* @param keys Keys.
* @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)}
* otherwise {@link #localLoad(Collection)}.
* otherwise {@link #localLoad(Collection, ExpiryPolicy)}.
*/
LoadKeysCallable(String cacheName, Collection<? extends K> keys, boolean update) {
LoadKeysCallable(String cacheName,
Collection<? extends K> keys,
boolean update,
ExpiryPolicy plc) {
this.cacheName = cacheName;
this.keys = keys;
this.update = update;
this.plc = plc;
}

/** {@inheritDoc} */
Expand All @@ -6083,7 +6122,7 @@ public LoadKeysCallable() {
if (update)
cache.localLoadAndUpdate(keys);
else
cache.localLoad(keys);
cache.localLoad(keys, plc);
}
finally {
cache.context().gate().leave();
Expand All @@ -6099,6 +6138,8 @@ public LoadKeysCallable() {
U.writeCollection(out, keys);

out.writeBoolean(update);

out.writeObject(plc != null ? new IgniteExternalizableExpiryPolicy(plc) : null);
}

/** {@inheritDoc} */
Expand All @@ -6108,6 +6149,8 @@ public LoadKeysCallable() {
keys = U.readCollection(in);

update = in.readBoolean();

plc = (ExpiryPolicy)in.readObject();
}
}

Expand All @@ -6125,17 +6168,19 @@ private class LocalStoreLoadClosure extends CIX3<K, V, GridCacheVersion> {
final IgniteDataLoaderImpl<K, V> ldr;

/** */
final long ttl;
final ExpiryPolicy plc;

/**
* @param p Key/value predicate.
* @param ldr Loader.
* @param ttl TTL.
* @param plc Optional expiry policy.
*/
private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, IgniteDataLoaderImpl<K, V> ldr, long ttl) {
private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p,
IgniteDataLoaderImpl<K, V> ldr,
@Nullable ExpiryPolicy plc) {
this.p = p;
this.ldr = ldr;
this.ttl = ttl;
this.plc = plc;

col = new ArrayList<>(ldr.perNodeBufferSize());
}
Expand All @@ -6147,12 +6192,29 @@ private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, IgniteDataLoa
if (p != null && !p.apply(key, val))
return;

long ttl = 0;

if (plc != null) {
ttl = CU.toTtl(plc.getExpiryForCreation());

if (ttl == CU.TTL_ZERO)
return;
else if (ttl == CU.TTL_NOT_CHANGED)
ttl = 0;
}

if (ctx.portableEnabled()) {
key = (K)ctx.marshalToPortable(key);
val = (V)ctx.marshalToPortable(val);
}

GridCacheRawVersionedEntry<K,V> e = new GridCacheRawVersionedEntry<>(key, null, val, null, ttl, 0, ver);
GridCacheRawVersionedEntry<K,V> e = new GridCacheRawVersionedEntry<>(key,
null,
val,
null,
ttl,
0,
ver);

e.marshal(ctx.marshaller());

Expand Down

0 comments on commit 30a6783

Please sign in to comment.