Skip to content

Commit

Permalink
Continuous queries fixes:
Browse files Browse the repository at this point in the history
- flush backup queue on exchange end (otherwise we don't really wait for all current operations)
- on coordinator apply counters after all single messages received (otherwise extra counter increments are possible)
- do not send info about filtered entries if do not have non-filtered entry
- added system properties for hardcoded constants
  • Loading branch information
sboikov committed May 29, 2017
1 parent 827b7f6 commit 42293fa
Show file tree
Hide file tree
Showing 25 changed files with 1,885 additions and 792 deletions.
Expand Up @@ -874,7 +874,11 @@ private void processMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure
finally {
// Reset thread local context.
cctx.tm().resetContext();
cctx.mvcc().contextReset();

GridCacheMvccManager mvcc = cctx.mvcc();

if (mvcc != null)
mvcc.contextReset();

// Unwind eviction notifications.
if (msg instanceof IgniteTxStateAware) {
Expand Down
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.processors.cache;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -29,7 +28,6 @@
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;

import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
Expand Down Expand Up @@ -62,8 +60,8 @@
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.lang.GridTuple;
Expand All @@ -76,7 +74,6 @@
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
Expand Down
Expand Up @@ -1312,7 +1312,7 @@ else if (!cacheCtx.isLocal())
top = cacheCtx.topology();

if (top != null) {
updated |= top.update(null, entry.getValue(), null) != null;
updated |= top.update(null, entry.getValue()) != null;

cctx.affinity().checkRebalanceState(top, cacheId);
}
Expand Down
Expand Up @@ -649,12 +649,30 @@ public long lastUpdateSequence() {
}
}

/** {@inheritDoc} */
@Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
assert cntrMap != null;

lock.writeLock().lock();

try {
for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
T2<Long, Long> cntr = this.cntrMap.get(e.getKey());

if (cntr == null || cntr.get2() < e.getValue().get2())
this.cntrMap.put(e.getKey(), e.getValue());
}
}
finally {
lock.writeLock().unlock();
}
}

/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(
@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap parts,
Map<Integer, T2<Long, Long>> cntrMap
GridDhtPartitionMap parts
) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
Expand Down Expand Up @@ -733,15 +751,6 @@ public long lastUpdateSequence() {
}
}

if (cntrMap != null) {
for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
T2<Long, Long> cntr = this.cntrMap.get(e.getKey());

if (cntr == null || cntr.get2() < e.getValue().get2())
this.cntrMap.put(e.getKey(), e.getValue());
}
}

consistencyCheck();

if (log.isDebugEnabled())
Expand Down
Expand Up @@ -234,12 +234,15 @@ public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
/**
* @param exchId Exchange ID.
* @param parts Partitions.
* @param cntrMap Partition update counters.
* @return Local partition map if there were evictions or {@code null} otherwise.
*/
@Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap parts,
@Nullable Map<Integer, T2<Long, Long>> cntrMap);
GridDhtPartitionMap parts);

/**
* @param cntrMap Counters map.
*/
public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap);

/**
* Checks if there is at least one owner for each partition in the cache topology.
Expand Down
Expand Up @@ -1255,12 +1255,46 @@ private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer)
}
}

/** {@inheritDoc} */
@Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
assert cntrMap != null;

lock.writeLock().lock();

try {
if (stopping)
return;

for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
T2<Long, Long> cntr = this.cntrMap.get(e.getKey());

if (cntr == null || cntr.get2() < e.getValue().get2())
this.cntrMap.put(e.getKey(), e.getValue());
}

for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);

if (part == null)
continue;

T2<Long, Long> cntr = cntrMap.get(part.id());

if (cntr != null && cntr.get2() > part.updateCounter())
part.updateCounter(cntr.get2());
}
}
finally {
lock.writeLock().unlock();

}
}

/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(
@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap parts,
@Nullable Map<Integer, T2<Long, Long>> cntrMap
GridDhtPartitionMap parts
) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
Expand All @@ -1279,27 +1313,6 @@ private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer)
if (stopping)
return null;

if (cntrMap != null) {
for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
T2<Long, Long> cntr = this.cntrMap.get(e.getKey());

if (cntr == null || cntr.get2() < e.getValue().get2())
this.cntrMap.put(e.getKey(), e.getValue());
}

for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);

if (part == null)
continue;

T2<Long, Long> cntr = cntrMap.get(part.id());

if (cntr != null && cntr.get2() > part.updateCounter())
part.updateCounter(cntr.get2());
}
}

if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
Expand Down
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.PartitionLossPolicy;
Expand All @@ -47,18 +46,19 @@
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
Expand Down Expand Up @@ -789,14 +789,11 @@ private void distributedExchange() throws IgniteCheckedException {

boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;

//todo check
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
continue;

if (topChanged) {
cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());

// Partition release future is done so we can flush the write-behind store.
cacheCtx.store().forceFlush();
}
Expand Down Expand Up @@ -1101,10 +1098,31 @@ private void sendPartitions(ClusterNode oldestNode) {
}
}

/**
* @return {@code True} if exchange triggered by server node join or fail.
*/
private boolean serverNodeDiscoveryEvent() {
assert discoEvt != null;

return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode());
}

/** {@inheritDoc} */
@Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) {
boolean realExchange = !dummy && !forcePreload;

if (err == null &&
realExchange &&
!cctx.kernalContext().clientNode() &&
(serverNodeDiscoveryEvent() || affChangeMsg != null)) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
continue;

cacheCtx.continuousQueries().flushBackupQueue(exchId.topologyVersion());
}
}

if (err == null && realExchange) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
Expand Down Expand Up @@ -1554,6 +1572,25 @@ private void onAllReceived() {
}
}

for (GridDhtPartitionsAbstractMessage msg : msgs.values()) {
if (msg instanceof GridDhtPartitionsSingleMessage) {
GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg;

for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) {
Integer cacheId = entry.getKey();
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);

GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
cctx.exchange().clientTopology(cacheId, this);

Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(cacheId);

if (cntrs != null)
top.applyUpdateCounters(cntrs);
}
}
}

if (discoEvt.type() == EVT_NODE_JOINED) {
if (cctx.kernalContext().state().active())
assignPartitionsStates();
Expand Down Expand Up @@ -1785,7 +1822,7 @@ private void updatePartitionSingleMap(ClusterNode node, GridDhtPartitionsSingleM
GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
cctx.exchange().clientTopology(cacheId, this);

top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
top.update(exchId, entry.getValue());
}
}

Expand Down

0 comments on commit 42293fa

Please sign in to comment.