Skip to content

Commit

Permalink
Merge remote-tracking branch 'community/ignite-5267-ds' into ignite-5267
Browse files Browse the repository at this point in the history
-ds
  • Loading branch information
ilantukh committed Jun 15, 2017
2 parents f6f6e47 + f21dcd1 commit 9379cce
Show file tree
Hide file tree
Showing 142 changed files with 4,477 additions and 702 deletions.
Expand Up @@ -56,7 +56,7 @@ public class RedisCommonAbstractTest extends GridCommonAbstractTest {

JedisPoolConfig jedisPoolCfg = new JedisPoolConfig();

jedisPoolCfg.setMaxWaitMillis(10000);
jedisPoolCfg.setMaxWaitMillis(20000);
jedisPoolCfg.setMaxIdle(100);
jedisPoolCfg.setMinIdle(1);
jedisPoolCfg.setNumTestsPerEvictionRun(10);
Expand Down
Expand Up @@ -200,6 +200,14 @@ public interface CacheMetrics {
*/
public float getOffHeapMissPercentage();

/**
* Gets the number of cache entries in heap memory, including entries held by active transactions, entries in
* onheap cache and near entries.
*
* @return Number of entries in heap memory.
*/
public long getHeapEntriesCount();

/**
* Gets number of entries stored in off-heap memory.
*
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.ignite.configuration;

import org.apache.ignite.internal.util.typedef.internal.S;

import java.io.Serializable;

/**
Expand Down Expand Up @@ -462,7 +464,7 @@ public PersistentStoreConfiguration setTlbSize(int tlbSize) {
}

/**
* Property define how often will be fsync.
* Property define how often will be fsync, in milliseconds.
* In background mode, exist thread which do fsync by timeout.
*
* @return Flush frequency.
Expand All @@ -472,7 +474,7 @@ public int getWalFlushFrequency() {
}

/**
* @param walFlushFreq Wal flush frequency.
* @param walFlushFreq Wal flush frequency, in milliseconds.
*/
public PersistentStoreConfiguration setWalFlushFrequency(int walFlushFreq) {
this.walFlushFreq = walFlushFreq;
Expand All @@ -481,14 +483,14 @@ public PersistentStoreConfiguration setWalFlushFrequency(int walFlushFreq) {
}

/**
*
* Gets the fsync delay, in nanoseconds.
*/
public int getWalFsyncDelay() {
return walFsyncDelay <= 0 ? DFLT_WAL_FSYNC_DELAY : walFsyncDelay;
}

/**
* @param walFsyncDelay Wal fsync delay.
* @param walFsyncDelay Wal fsync delay, in nanoseconds.
*/
public PersistentStoreConfiguration setWalFsyncDelay(int walFsyncDelay) {
this.walFsyncDelay = walFsyncDelay;
Expand Down Expand Up @@ -530,4 +532,9 @@ public PersistentStoreConfiguration setAlwaysWriteFullPages(boolean alwaysWriteF

return this;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PersistentStoreConfiguration.class, this);
}
}
Expand Up @@ -17,27 +17,44 @@

package org.apache.ignite.configuration;

import org.jetbrains.annotations.Nullable;

/**
* WAL Mode. This enum defines crash recovery guarantees when Ignite persistence is enabled.
*/
public enum WALMode {
/**
*
* Default mode: full-sync disk writes. These writes survive power loss scenarios.
*/
DEFAULT,

/**
*
* Log only mode: flushes application buffers. These writes survive process crash.
*/
LOG_ONLY,

/**
*
* Background mode. Does not force application buffer flush. Data may be lost in case of process crash.
*/
BACKGROUND,

/**
* WAL disabled.
*/
NONE;

/**
* Enumerated values.
*/
private static final WALMode[] VALS = values();

/**
* Efficiently gets enumerated value from its ordinal.
*
* @param ord Ordinal value.
* @return Enumerated value or {@code null} if ordinal out of range.
*/
NONE
}
@Nullable public static WALMode fromOrdinal(int ord) {
return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
}
}
Expand Up @@ -88,6 +88,11 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean {
return cache.clusterMetrics().getOffHeapEntriesCount();
}

/** {@inheritDoc} */
@Override public long getHeapEntriesCount() {
return cache.clusterMetrics().getHeapEntriesCount();
}

/** {@inheritDoc} */
@Override public long getOffHeapPrimaryEntriesCount() {
return cache.clusterMetrics().getOffHeapPrimaryEntriesCount();
Expand Down
Expand Up @@ -88,6 +88,11 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean {
return cache.metrics0().getOffHeapEntriesCount();
}

/** {@inheritDoc} */
@Override public long getHeapEntriesCount() {
return cache.metrics0().getHeapEntriesCount();
}

/** {@inheritDoc} */
@Override public long getOffHeapPrimaryEntriesCount() {
return cache.metrics0().getOffHeapPrimaryEntriesCount();
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics;
Expand All @@ -37,6 +38,10 @@ public class CacheMetricsImpl implements CacheMetrics {
private static final int REBALANCE_RATE_INTERVAL = IgniteSystemProperties.getInteger(
IgniteSystemProperties.IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL, 60000);

/** Onheap peek modes. */
private static final CachePeekMode[] ONHEAP_PEEK_MODES = new CachePeekMode[] {
CachePeekMode.ONHEAP, CachePeekMode.PRIMARY, CachePeekMode.BACKUP, CachePeekMode.NEAR};

/** */
private static final long NANOS_IN_MICROSECOND = 1000L;

Expand Down Expand Up @@ -215,6 +220,16 @@ public void delegate(CacheMetricsImpl delegate) {
return cache != null ? cache.offHeapEntriesCount() : -1;
}

/** {@inheritDoc} */
@Override public long getHeapEntriesCount() {
try {
return cctx.cache().localSize(ONHEAP_PEEK_MODES);
}
catch (IgniteCheckedException ignored) {
return 0;
}
}

/** {@inheritDoc} */
@Override public long getOffHeapPrimaryEntriesCount() {
try {
Expand Down
Expand Up @@ -95,6 +95,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** Number of entries stored in off-heap memory. */
private long offHeapEntriesCnt;

/** Number of entries stored in heap. */
private long heapEntriesCnt;

/** Number of primary entries stored in off-heap memory. */
private long offHeapPrimaryEntriesCnt;

Expand Down Expand Up @@ -258,6 +261,7 @@ public CacheMetricsSnapshot(CacheMetrics m) {
offHeapHits = m.getOffHeapHits();
offHeapMisses = m.getOffHeapMisses();
offHeapEntriesCnt = m.getOffHeapEntriesCount();
heapEntriesCnt = m.getHeapEntriesCount();
offHeapPrimaryEntriesCnt = m.getOffHeapPrimaryEntriesCount();
offHeapBackupEntriesCnt = m.getOffHeapBackupEntriesCount();
offHeapAllocatedSize = m.getOffHeapAllocatedSize();
Expand Down Expand Up @@ -354,6 +358,7 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics)
offHeapHits += e.getOffHeapHits();
offHeapMisses += e.getOffHeapMisses();
offHeapEntriesCnt += e.getOffHeapEntriesCount();
heapEntriesCnt += e.getHeapEntriesCount();
offHeapPrimaryEntriesCnt += e.getOffHeapPrimaryEntriesCount();
offHeapBackupEntriesCnt += e.getOffHeapBackupEntriesCount();
offHeapAllocatedSize += e.getOffHeapAllocatedSize();
Expand Down Expand Up @@ -575,6 +580,11 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics)
return offHeapEntriesCnt;
}

/** {@inheritDoc} */
@Override public long getHeapEntriesCount() {
return heapEntriesCnt;
}

/** {@inheritDoc} */
@Override public long getOffHeapPrimaryEntriesCount() {
return offHeapPrimaryEntriesCnt;
Expand Down Expand Up @@ -814,6 +824,7 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics)
out.writeLong(offHeapHits);
out.writeLong(offHeapMisses);
out.writeLong(offHeapEntriesCnt);
out.writeLong(heapEntriesCnt);
out.writeLong(offHeapPrimaryEntriesCnt);
out.writeLong(offHeapBackupEntriesCnt);
out.writeLong(offHeapAllocatedSize);
Expand Down Expand Up @@ -868,6 +879,7 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics)
offHeapHits = in.readLong();
offHeapMisses = in.readLong();
offHeapEntriesCnt = in.readLong();
heapEntriesCnt = in.readLong();
offHeapPrimaryEntriesCnt = in.readLong();
offHeapBackupEntriesCnt = in.readLong();
offHeapAllocatedSize = in.readLong();
Expand Down
Expand Up @@ -30,7 +30,7 @@ public enum CacheType {
USER(true, SYSTEM_POOL),

/**
* Internal cache, should not be visible via public API (caches used by IGFS, Hadoop,).
* Internal cache, should not be visible via public API (caches used by IGFS, Hadoop).
*/
INTERNAL(false, SYSTEM_POOL),

Expand Down
Expand Up @@ -3917,7 +3917,7 @@ private Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary,
IgniteCacheOffheapManager mgr = ctx.offheap();

return mgr != null ? mgr.cacheEntriesCount(ctx.cacheId(),
false,
true,
true,
ctx.affinity().affinityTopologyVersion()) : -1;
}
Expand Down
Expand Up @@ -1771,8 +1771,7 @@ public Collection<DynamicCacheDescriptor> startReceivedCaches(UUID nodeId, Affin
for (DynamicCacheDescriptor desc : started) {
IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter();

//Todo check second condition.
if (CU.affinityNode(ctx.discovery().localNode(), filter) || CU.isSystemCache(desc.cacheName())) {
if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
prepareCacheStart(
desc,
null,
Expand Down
Expand Up @@ -1352,7 +1352,7 @@ else if (locPart.state() == OWNING || locPart.state() == MOVING) {

long updateSeq = this.updateSeq.incrementAndGet();

node2part.updateSequence(updateSeq);
node2part.newUpdateSequence(updateSeq);

boolean changed = false;

Expand Down Expand Up @@ -2030,7 +2030,10 @@ private void removeNode(UUID nodeId) {
lock.readLock().lock();

try {
assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
if (node2part == null)
return false;

assert node2part.valid() : "Invalid node2part [node2part: " + node2part +
", grp=" + grp.cacheOrGroupName() +
", stopping=" + stopping +
", locNodeId=" + ctx.localNodeId() +
Expand Down
Expand Up @@ -387,12 +387,12 @@ public boolean dummyReassign() {
}

/**
* @param cacheId Cache ID.
* @param grpId Cache group ID.
* @param partId Partition ID.
* @return ID of history supplier node or null if it doesn't exist.
*/
@Nullable public UUID partitionHistorySupplier(int cacheId, int partId) {
return partHistSuppliers.getSupplier(cacheId, partId);
@Nullable public UUID partitionHistorySupplier(int grpId, int partId) {
return partHistSuppliers.getSupplier(grpId, partId);
}

/**
Expand Down
Expand Up @@ -46,16 +46,16 @@ public static IgniteDhtPartitionHistorySuppliersMap empty() {
}

/**
* @param cacheId Cache ID.
* @param grpId Cache group ID.
* @param partId Partition ID.
* @return Supplier UUID.
*/
@Nullable public synchronized UUID getSupplier(int cacheId, int partId) {
@Nullable public synchronized UUID getSupplier(int grpId, int partId) {
if (map == null)
return null;

for (Map.Entry<UUID, Map<T2<Integer, Integer>, Long>> e : map.entrySet()) {
if (e.getValue().containsKey(new T2<>(cacheId, partId)))
if (e.getValue().containsKey(new T2<>(grpId, partId)))
return e.getKey();
}

Expand All @@ -75,11 +75,11 @@ public static IgniteDhtPartitionHistorySuppliersMap empty() {

/**
* @param nodeId Node ID.
* @param cacheId Cache ID.
* @param grpId Cache group ID.
* @param partId Partition ID.
* @param cntr Partition counter.
*/
public synchronized void put(UUID nodeId, int cacheId, int partId, long cntr) {
public synchronized void put(UUID nodeId, int grpId, int partId, long cntr) {
if (map == null)
map = new HashMap<>();

Expand All @@ -91,7 +91,7 @@ public synchronized void put(UUID nodeId, int cacheId, int partId, long cntr) {
map.put(nodeId, nodeMap);
}

nodeMap.put(new T2<>(cacheId, partId), cntr);
nodeMap.put(new T2<>(grpId, partId), cntr);
}

/**
Expand Down
Expand Up @@ -17,12 +17,14 @@

package org.apache.ignite.internal.processors.datastructures;

import org.apache.ignite.internal.processors.cache.GridCacheInternal;

import java.io.Serializable;

/**
* Key used to store in utility cache information about created data structures.
*/
public class DataStructureInfoKey implements Serializable {
public class DataStructureInfoKey implements GridCacheInternal, Serializable {
/** */
private static final long serialVersionUID = 0L;

Expand Down
Expand Up @@ -17,12 +17,14 @@

package org.apache.ignite.internal.processors.datastructures;

import org.apache.ignite.internal.processors.cache.GridCacheInternal;

import java.io.Serializable;

/**
* Internal key for data structures processor.
*/
public class DataStructuresCacheKey implements Serializable {
public class DataStructuresCacheKey implements GridCacheInternal, Serializable {
/** */
private static final long serialVersionUID = 0L;

Expand Down

0 comments on commit 9379cce

Please sign in to comment.