Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/ignite-gg-8.0.1.ea7' into ignite…
Browse files Browse the repository at this point in the history
…-gg-8.0.2.ea1

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
#	modules/core/src/main/resources/META-INF/classnames.properties
#	modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
  • Loading branch information
EdShangGG committed Dec 12, 2016
2 parents eb6fdbf + 962daa8 commit 47e0116
Show file tree
Hide file tree
Showing 40 changed files with 559 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
Expand Down Expand Up @@ -136,8 +137,8 @@ public GridEventConsumeHandler() {
}

/** {@inheritDoc} */
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
Map<Integer, Long> cntrs) {
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs) {
// No-op.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
Expand Down Expand Up @@ -125,8 +126,8 @@ public GridMessageListenHandler(GridMessageListenHandler orig) {
}

/** {@inheritDoc} */
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
Map<Integer, Long> cntrs) {
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs) {
// No-op.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ private long absolute(long relativePtr) {

pageIdx &= idxMask;

long offset = pageIdx * sysPageSize;
long offset = ((long)pageIdx) * sysPageSize;

return pagesBase + offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager {
* @param partId Partition ID.
* @throws IgniteCheckedException If failed to handle partition destroy callback.
*/
public void onPartitionDestroyed(int cacheId, int partId) throws IgniteCheckedException;
public void onPartitionDestroyed(int cacheId, int partId, int tag) throws IgniteCheckedException;

/**
* Reads a page for the given cache ID. Cache ID may be {@code 0} if the page is a meta page.
Expand Down Expand Up @@ -115,7 +115,7 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager {
* @param pageBuf Page buffer to write.
* @throws IgniteCheckedException If failed to write page.
*/
public void write(int cacheId, long pageId, ByteBuffer pageBuf) throws IgniteCheckedException;
public void write(int cacheId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException;

/**
* Gets page offset within the page store file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public interface PageStore {
* @param pageBuf Page buffer to write.
* @throws IgniteCheckedException If page writing failed (IO error occurred).
*/
public void write(long pageId, ByteBuffer pageBuf) throws IgniteCheckedException;
public void write(long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException;

/**
* Gets page offset within the store file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public enum CacheState {
*/
INACTIVE,

/** Cache is inactive. But process of it activation in progress. */
ACTIVATING,

/**
* Cache is active and operations. There are no lost partitions.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
Expand Down Expand Up @@ -4555,9 +4556,7 @@ protected Object readResolve() throws ObjectStreamException {

/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> rebalance() {
ctx.preloader().forcePreload();

return ctx.preloader().syncFuture();
return ctx.preloader().forceRebalance();
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.lang.GridFunc;
Expand Down Expand Up @@ -1952,6 +1953,13 @@ public boolean allowFastLocalRead(int part, List<ClusterNode> affNodes, Affinity
return affinityNode() && rebalanceEnabled() && hasPartition(part, affNodes, topVer);
}

/**
* @return {@code True} if fast eviction is allowed.
*/
public boolean allowFastEviction() {
return shared().database().persistenceEnabled() && !GridQueryProcessor.isEnabled(cacheCfg);
}

/**
* @param part Partition.
* @param affNodes Affinity nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,13 @@ public void forceDummyExchange(boolean reassign,
*
* @param exchFut Exchange future.
*/
public void forcePreloadExchange(GridDhtPartitionsExchangeFuture exchFut) {
public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();

exchWorker.addFuture(
new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId()));
new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut));

return fut;
}

/**
Expand Down Expand Up @@ -1794,7 +1798,8 @@ void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
forcePreload,
waitList,
cnt);
cnt,
exchFut.forcedRebalanceFuture());

if (r != null) {
U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -91,7 +92,8 @@ public interface GridCachePreloader {
public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
Collection<String> caches,
int cnt);
int cnt,
@Nullable GridFutureAdapter<Boolean> forcedRebFut);

/**
* @param p Preload predicate.
Expand Down Expand Up @@ -151,9 +153,9 @@ public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest
AffinityTopologyVersion topVer);

/**
* Force preload process.
* Force Rebalance process.
*/
public void forcePreload();
public IgniteInternalFuture<Boolean> forceRebalance();

/**
* Unwinds undeploys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -89,8 +90,8 @@ public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) {
}

/** {@inheritDoc} */
@Override public void forcePreload() {
// No-op.
@Override public IgniteInternalFuture<Boolean> forceRebalance() {
return new GridFinishedFuture<>(true);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -167,7 +168,7 @@ public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) {

/** {@inheritDoc} */
@Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
Collection<String> caches, int cnt) {
Collection<String> caches, int cnt, @Nullable GridFutureAdapter<Boolean> forcedRebFut) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
Expand All @@ -53,6 +55,7 @@
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIterator;
Expand Down Expand Up @@ -93,6 +96,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** */
private final GridAtomicLong globalRmvId = new GridAtomicLong(U.currentTimeMillis() * 1000_000);

/** */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();

/** {@inheritDoc} */
@Override public GridAtomicLong globalRemoveId() {
return globalRmvId;
Expand Down Expand Up @@ -148,6 +154,13 @@ protected void initDataStructures() throws IgniteCheckedException {
destroyCacheDataStructures(destroy);
}

/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
super.onKernalStop0(cancel);

busyLock.block();
}

/**
*
*/
Expand Down Expand Up @@ -845,69 +858,88 @@ public CacheDataStoreImpl(
key.valueBytes(cctx.cacheObjectContext());
val.valueBytes(cctx.cacheObjectContext());

rowStore.addRow(dataRow);
if (!busyLock.enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");

assert dataRow.link() != 0 : dataRow;
try {
rowStore.addRow(dataRow);

DataRow old = dataTree.put(dataRow);
assert dataRow.link() != 0 : dataRow;

if (old == null)
storageSize.incrementAndGet();
DataRow old = dataTree.put(dataRow);

if (indexingEnabled) {
GridCacheQueryManager qryMgr = cctx.queries();
if (old == null)
storageSize.incrementAndGet();

assert qryMgr.enabled();
if (indexingEnabled) {
GridCacheQueryManager qryMgr = cctx.queries();

if (old != null)
qryMgr.store(key, p, old.value(), old.version(), val, ver, expireTime, dataRow.link());
else
qryMgr.store(key, p, null, null, val, ver, expireTime, dataRow.link());
}
assert qryMgr.enabled();

if (old != null) {
assert old.link() != 0 : old;
if (old != null)
qryMgr.store(key, p, old.value(), old.version(), val, ver, expireTime, dataRow.link());
else
qryMgr.store(key, p, null, null, val, ver, expireTime, dataRow.link());
}

if (pendingEntries != null && old.expireTime() != 0)
pendingEntries.remove(new PendingRow(old.expireTime(), old.link()));
if (old != null) {
assert old.link() != 0 : old;

rowStore.removeRow(old.link());
}
if (pendingEntries != null && old.expireTime() != 0)
pendingEntries.remove(new PendingRow(old.expireTime(), old.link()));

rowStore.removeRow(old.link());
}

if (pendingEntries != null && expireTime != 0) {
pendingEntries.put(new PendingRow(expireTime, dataRow.link()));

if (pendingEntries != null && expireTime != 0)
pendingEntries.put(new PendingRow(expireTime, dataRow.link()));
cctx.ttl().onPendingEntryAdded(expireTime);
}
}
finally {
busyLock.leaveBusy();
}
}

/** {@inheritDoc} */
@Override public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException {
DataRow dataRow = dataTree.remove(new KeySearchRow(key.hashCode(), key, 0));
if (!busyLock.enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");

CacheObject val = null;
GridCacheVersion ver = null;
try {
DataRow dataRow = dataTree.remove(new KeySearchRow(key.hashCode(), key, 0));

if (dataRow != null) {
assert dataRow.link() != 0 : dataRow;
CacheObject val = null;
GridCacheVersion ver = null;

if (pendingEntries != null && dataRow.expireTime() != 0)
pendingEntries.remove(new PendingRow(dataRow.expireTime(), dataRow.link()));
if (dataRow != null) {
assert dataRow.link() != 0 : dataRow;

storageSize.decrementAndGet();
if (pendingEntries != null && dataRow.expireTime() != 0)
pendingEntries.remove(new PendingRow(dataRow.expireTime(), dataRow.link()));

val = dataRow.value();
storageSize.decrementAndGet();

ver = dataRow.version();
}
val = dataRow.value();

if (indexingEnabled) {
GridCacheQueryManager qryMgr = cctx.queries();
ver = dataRow.version();
}

assert qryMgr.enabled();
if (indexingEnabled) {
GridCacheQueryManager qryMgr = cctx.queries();

qryMgr.remove(key, partId, val, ver);
}
assert qryMgr.enabled();

qryMgr.remove(key, partId, val, ver);
}

if (dataRow != null)
rowStore.removeRow(dataRow.link());
if (dataRow != null)
rowStore.removeRow(dataRow.link());
}
finally {
busyLock.leaveBusy();
}
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2359,9 +2359,7 @@ private void onLeave(GridCacheGateway<K, V> gate) {

/** {@inheritDoc} */
@Override public IgniteFuture<?> rebalance() {
ctx.preloader().forcePreload();

return new IgniteFutureImpl<>(ctx.preloader().syncFuture());
return new IgniteFutureImpl<>(ctx.preloader().forceRebalance());
}

/** {@inheritDoc} */
Expand Down

0 comments on commit 47e0116

Please sign in to comment.