Skip to content

Commit

Permalink
IGNITE-3227 - Added partition size method. Fixes #815.
Browse files Browse the repository at this point in the history
  • Loading branch information
samaitra authored and agoncharuk committed Jul 7, 2016
1 parent 77bcabc commit f55cee9
Show file tree
Hide file tree
Showing 8 changed files with 846 additions and 4 deletions.
25 changes: 25 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/IgniteCache.java
Expand Up @@ -369,6 +369,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
@IgniteAsyncSupported
public long sizeLong(CachePeekMode... peekModes) throws CacheException;

/**
* Gets the number of all entries cached in a partition as a long value. By default, if {@code peekModes} value
* isn't defined, only size of primary copies across all nodes will be returned. This behavior is identical to
* calling this method with {@link CachePeekMode#PRIMARY} peek mode.
* <p>
* NOTE: this operation is distributed and will query all participating nodes for their partition cache sizes.
*
* @param partition partition.
* @param peekModes Optional peek modes. If not provided, then total partition cache size is returned.
* @return Partion cache size across all nodes.
*/
@IgniteAsyncSupported
public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException;

/**
* Gets the number of all entries cached on this node. By default, if {@code peekModes} value isn't defined,
* only size of primary copies will be returned. This behavior is identical to calling this method with
Expand All @@ -389,6 +403,17 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
*/
public long localSizeLong(CachePeekMode... peekModes);

/**
* Gets the number of all entries cached on this node for the partition as a long value. By default, if {@code peekModes} value isn't
* defined, only size of primary copies will be returned. This behavior is identical to calling this method with
* {@link CachePeekMode#PRIMARY} peek mode.
*
* @param partition partition.
* @param peekModes Optional peek modes. If not provided, then total cache size is returned.
* @return Cache size on this node.
*/
public long localSizeLong(int partition, CachePeekMode... peekModes);

/**
* @param map Map containing keys and entry processors to be applied to values.
* @param args Additional arguments to pass to the {@link EntryProcessor}.
Expand Down
Expand Up @@ -84,6 +84,7 @@
import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
Expand Down Expand Up @@ -3840,6 +3841,14 @@ IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p
return sizeLongAsync(peekModes).get();
}

/** {@inheritDoc} */
@Override public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
if (isLocal())
return localSizeLong(partition, peekModes);

return sizeLongAsync(partition, peekModes).get();
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<Integer> sizeAsync(final CachePeekMode[] peekModes) {
assert peekModes != null;
Expand Down Expand Up @@ -3882,6 +3891,36 @@ IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p
new SizeLongTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null);
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<Long> sizeLongAsync(final int part, final CachePeekMode[] peekModes) {
assert peekModes != null;

final PeekModes modes = parsePeekModes(peekModes, true);

IgniteClusterEx cluster = ctx.grid().cluster();
final GridCacheAffinityManager aff = ctx.affinity();
final AffinityTopologyVersion topVer = aff.affinityTopologyVersion();

ClusterGroup grp = cluster.forDataNodes(name());

Collection<ClusterNode> nodes = grp.forPredicate(new IgnitePredicate<ClusterNode>() {
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode clusterNode) {
return clusterNode.version().compareTo(PartitionSizeLongTask.SINCE_VER) >= 0 &&
((modes.primary && aff.primary(clusterNode, part, topVer)) ||
(modes.backup && aff.backup(clusterNode, part, topVer)));
}
}).nodes();

if (nodes.isEmpty())
return new GridFinishedFuture<>(0L);

ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);

return ctx.kernalContext().task().execute(
new PartitionSizeLongTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, part), null);
}

/** {@inheritDoc} */
@Override public int localSize(CachePeekMode[] peekModes) throws IgniteCheckedException {
return (int)localSizeLong(peekModes);
Expand Down Expand Up @@ -3935,6 +3974,50 @@ IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p
return size;
}

/** {@inheritDoc} */
@Override public long localSizeLong(int part, CachePeekMode[] peekModes) throws IgniteCheckedException {
PeekModes modes = parsePeekModes(peekModes, true);

long size = 0;

AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();

// Swap and offheap are disabled for near cache.
GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();

if (ctx.isLocal()){
modes.primary = true;
modes.backup = true;

if (modes.heap)
size += size();

if (modes.swap)
size += swapMgr.swapEntriesCount(0);

if (modes.offheap)
size += swapMgr.offheapEntriesCount(0);
}
else {
GridDhtLocalPartition dhtPart = ctx.topology().localPartition(part, topVer, false);

if (dhtPart != null) {
if (modes.primary && dhtPart.primary(topVer) || modes.backup && dhtPart.backup(topVer)) {
if (modes.heap)
size += dhtPart.publicSize();

if (modes.swap)
size += swapMgr.swapEntriesCount(part);

if (modes.offheap)
size += swapMgr.offheapEntriesCount(part);
}
}
}

return size;
}

/** {@inheritDoc} */
@Override public int size() {
return map.publicSize();
Expand Down Expand Up @@ -5579,6 +5662,52 @@ public String toString() {
}
}

/**
* Internal callable for partition size calculation.
*/
@GridInternal
private static class PartitionSizeLongJob extends TopologyVersionAwareJob {
/** */
private static final long serialVersionUID = 0L;

/** Partition. */
private final int partition;

/** Peek modes. */
private final CachePeekMode[] peekModes;

/**
* @param cacheName Cache name.
* @param topVer Affinity topology version.
* @param peekModes Cache peek modes.
* @param partition partition.
*/
private PartitionSizeLongJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, int partition) {
super(cacheName, topVer);

this.peekModes = peekModes;
this.partition = partition;
}

/** {@inheritDoc} */
@Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) {
if (cache == null)
return 0;

try {
return cache.localSizeLong(partition, peekModes);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}

/** {@inheritDoc} */
public String toString() {
return S.toString(PartitionSizeLongJob.class, this);
}
}

/**
* Internal callable for global size calculation.
*/
Expand Down Expand Up @@ -6470,7 +6599,7 @@ private static class SizeLongTask extends ComputeTaskAdapter<Object, Long> {
* @param topVer Affinity topology version.
* @param peekModes Cache peek modes.
*/
public SizeLongTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
private SizeLongTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
this.cacheName = cacheName;
this.topVer = topVer;
this.peekModes = peekModes;
Expand Down Expand Up @@ -6514,6 +6643,90 @@ public SizeLongTask(String cacheName, AffinityTopologyVersion topVer, CachePeekM
}
}

/**
* Partition Size Long task.
*/
private static class PartitionSizeLongTask extends ComputeTaskAdapter<Object, Long> {
/** */
private static final long serialVersionUID = 0L;

/** */
private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.30");

/** Partition */
private final int partition;

/** Cache name. */
private final String cacheName;

/** Affinity topology version. */
private final AffinityTopologyVersion topVer;

/** Peek modes. */
private final CachePeekMode[] peekModes;

/**
* @param cacheName Cache name.
* @param topVer Affinity topology version.
* @param peekModes Cache peek modes.
* @param partition partition.
*/
private PartitionSizeLongTask(
String cacheName,
AffinityTopologyVersion topVer,
CachePeekMode[] peekModes,
int partition
) {
this.cacheName = cacheName;
this.topVer = topVer;
this.peekModes = peekModes;
this.partition = partition;
}

/** {@inheritDoc} */
@Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(
List<ClusterNode> subgrid,
@Nullable Object arg
) throws IgniteException {
Map<ComputeJob, ClusterNode> jobs = new HashMap();

for (ClusterNode node : subgrid)
jobs.put(new PartitionSizeLongJob(cacheName, topVer, peekModes, partition), node);

return jobs;
}

/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
IgniteException e = res.getException();

if (e != null) {
if (e instanceof ClusterTopologyException)
return ComputeJobResultPolicy.WAIT;

throw new IgniteException("Remote job threw exception.", e);
}

return ComputeJobResultPolicy.WAIT;
}

/** {@inheritDoc} */
@Nullable @Override public Long reduce(List<ComputeJobResult> results) throws IgniteException {
long size = 0;

for (ComputeJobResult res : results) {
if (res != null) {
if (res.getException() == null)
size += res.<Long>getData();
else
throw res.getException();
}
}

return size;
}
}

/**
* Clear task.
*/
Expand Down
Expand Up @@ -1471,6 +1471,18 @@ public IgniteInternalCache<K, V> delegate() {
}
}

/** {@inheritDoc} */
@Override public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);

try {
return delegate.sizeLong(partition, peekModes);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<Integer> sizeAsync(CachePeekMode[] peekModes) {
CacheOperationContext prev = gate.enter(opCtx);
Expand All @@ -1495,6 +1507,18 @@ public IgniteInternalCache<K, V> delegate() {
}
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<Long> sizeLongAsync(int partition, CachePeekMode[] peekModes) {
CacheOperationContext prev = gate.enter(opCtx);

try {
return delegate.sizeLongAsync(partition, peekModes);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public int localSize(CachePeekMode[] peekModes) throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);
Expand All @@ -1519,6 +1543,18 @@ public IgniteInternalCache<K, V> delegate() {
}
}

/** {@inheritDoc} */
@Override public long localSizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);

try {
return delegate.localSizeLong(partition, peekModes);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public int nearSize() {
CacheOperationContext prev = gate.enter(opCtx);
Expand Down

0 comments on commit f55cee9

Please sign in to comment.