Skip to content

Commit

Permalink
# Merged 6.6.3 fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
vozerov-gridgain committed Feb 5, 2015
1 parent 46160c9 commit a86ae90
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 44 deletions.
Expand Up @@ -1507,29 +1507,23 @@ else if (log.isDebugEnabled())
private void onSegmentation() {
GridSegmentationPolicy segPlc = ctx.config().getSegmentationPolicy();

// Always disconnect first.
try {
getSpi().disconnect();
}
catch (IgniteSpiException e) {
U.error(log, "Failed to disconnect discovery SPI.", e);
}

switch (segPlc) {
case RESTART_JVM:
try {
getSpi().disconnect();
}
catch (IgniteSpiException e) {
U.error(log, "Failed to disconnect discovery SPI.", e);
}

U.warn(log, "Restarting JVM according to configured segmentation policy.");

restartJvm();

break;

case STOP:
try {
getSpi().disconnect();
}
catch (IgniteSpiException e) {
U.error(log, "Failed to disconnect discovery SPI.", e);
}

U.warn(log, "Stopping local node according to configured segmentation policy.");

stopNode();
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.affinity;

import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
Expand All @@ -35,6 +36,8 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import static org.apache.ignite.cache.CacheDistributionMode.*;

/**
* Affinity cached function.
*/
Expand Down Expand Up @@ -121,6 +124,7 @@ public void initialize(long topVer, List<List<ClusterNode>> affAssignment) {
* @param topVer Topology version to calculate affinity cache for.
* @param discoEvt Discovery event that caused this topology version change.
*/
@SuppressWarnings("IfMayBeConditional")
public List<List<ClusterNode>> calculate(long topVer, IgniteDiscoveryEvent discoEvt) {
if (log.isDebugEnabled())
log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
Expand All @@ -142,8 +146,23 @@ public List<List<ClusterNode>> calculate(long topVer, IgniteDiscoveryEvent disco

List<List<ClusterNode>> prevAssignment = prev == null ? null : prev.assignment();

List<List<ClusterNode>> assignment = aff.assignPartitions(
new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, topVer, backups));
List<List<ClusterNode>> assignment;

if (prevAssignment != null && discoEvt != null) {
CacheDistributionMode distroMode = U.distributionMode(discoEvt.eventNode(), ctx.name());

if (distroMode == null || // no cache on node.
distroMode == CLIENT_ONLY || distroMode == NEAR_ONLY)
assignment = prevAssignment;
else
assignment = aff.assignPartitions(new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment,
discoEvt, topVer, backups));
}
else
assignment = aff.assignPartitions(new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt,
topVer, backups));

assert assignment != null;

GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment);

Expand Down
Expand Up @@ -1167,8 +1167,7 @@ else if (interceptorVal != val) {

CacheMode mode = cctx.config().getCacheMode();

if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED ||
(tx != null && tx.local() && !isNear()))
if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false);

cctx.dataStructures().onEntryUpdated(key, false);
Expand Down Expand Up @@ -1329,8 +1328,7 @@ else if (log.isDebugEnabled())

CacheMode mode = cctx.config().getCacheMode();

if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED ||
(tx != null && tx.local() && !isNear()))
if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false);

cctx.dataStructures().onEntryUpdated(key, true);
Expand Down Expand Up @@ -2144,7 +2142,7 @@ assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + thi
if (res)
updateMetrics(op, metrics);

if (primary || cctx.isReplicated())
if (primary)
cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false);

cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
Expand Down Expand Up @@ -3143,7 +3141,7 @@ else if (deletedUnlocked())
drReplicate(drType, val, valBytes, ver);

if (!skipQryNtf) {
if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer)) {
if (cctx.affinity().primary(cctx.localNode(), key, topVer)) {
cctx.continuousQueries().onEntryUpdate(this,
key,
val,
Expand Down
Expand Up @@ -35,7 +35,7 @@
import java.util.*;
import java.util.concurrent.locks.*;

import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.cache.CacheDistributionMode.*;

/**
* Continuous query implementation.
Expand Down Expand Up @@ -228,27 +228,39 @@ public void execute(@Nullable ClusterGroup prj,
prj = prj.forCacheNodes(ctx.name());

if (prj.nodes().isEmpty())
throw new ClusterTopologyCheckedException("Failed to execute query (projection is empty): " + this);
throw new ClusterTopologyCheckedException("Failed to continuous execute query (projection is empty): " +
this);

CacheMode mode = ctx.config().getCacheMode();
boolean skipPrimaryCheck = false;

if (mode == LOCAL || mode == REPLICATED) {
Collection<ClusterNode> nodes = prj.nodes();
Collection<ClusterNode> nodes = prj.nodes();

ClusterNode node = nodes.contains(ctx.localNode()) ? ctx.localNode() : F.rand(nodes);
if (nodes.isEmpty())
throw new ClusterTopologyCheckedException("Failed to execute continuous query (empty projection is " +
"provided): " + this);

assert node != null;
switch (ctx.config().getCacheMode()) {
case LOCAL:
if (!nodes.contains(ctx.localNode()))
throw new ClusterTopologyCheckedException("Continuous query for LOCAL cache can be executed " +
"only locally (provided projection contains remote nodes only): " + this);
else if (nodes.size() > 1)
U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " +
"ignored): " + this);

if (nodes.size() > 1) {
if (node.id().equals(ctx.localNodeId()))
U.warn(log, "Continuous query for " + mode + " cache can be run only on local node. " +
"Will execute query locally: " + this);
else
U.warn(log, "Continuous query for " + mode + " cache can be run only on single node. " +
"Will execute query on remote node [qry=" + this + ", node=" + node + ']');
}
prj = prj.forNode(ctx.localNode());

prj = prj.forNode(node);
break;

case REPLICATED:
if (nodes.size() == 1 && F.first(nodes).equals(ctx.localNode())) {
CacheDistributionMode distributionMode = ctx.config().getDistributionMode();

if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED)
skipPrimaryCheck = true;
}

break;
}

closeLock.lock();
Expand All @@ -271,6 +283,7 @@ public void execute(@Nullable ClusterGroup prj,
entryLsnr,
sync,
oldVal,
skipPrimaryCheck,
taskNameHash,
keepPortable);

Expand Down
Expand Up @@ -84,6 +84,9 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** Keep portable flag. */
private boolean keepPortable;

/** Whether to skip primary check for REPLICATED cache. */
private transient boolean skipPrimaryCheck;

/**
* Required by {@link Externalizable}.
*/
Expand All @@ -103,6 +106,7 @@ public GridCacheContinuousQueryHandler() {
* @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
* @param sync {@code True} if query created for synchronous {@link CacheEntryListener}.
* @param oldVal {@code True} if old value is required.
* @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
* @param taskHash Task name hash code.
*/
GridCacheContinuousQueryHandler(@Nullable String cacheName,
Expand All @@ -114,6 +118,7 @@ public GridCacheContinuousQueryHandler() {
boolean entryLsnr,
boolean sync,
boolean oldVal,
boolean skipPrimaryCheck,
int taskHash,
boolean keepPortable) {
assert topic != null;
Expand All @@ -131,6 +136,7 @@ public GridCacheContinuousQueryHandler() {
this.oldVal = oldVal;
this.taskHash = taskHash;
this.keepPortable = keepPortable;
this.skipPrimaryCheck = skipPrimaryCheck;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -184,16 +190,21 @@ public GridCacheContinuousQueryHandler() {
}

@Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) {
GridCacheContext<K, V> cctx = cacheContext(ctx);

if (cctx.isReplicated() && !skipPrimaryCheck && !e.primary())
return;

boolean notify;

CacheFlag[] f = cacheContext(ctx).forceLocalRead();
CacheFlag[] f = cctx.forceLocalRead();

try {
notify = (prjPred == null || checkProjection(e)) &&
(filter == null || filter.apply(e));
}
finally {
cacheContext(ctx).forceFlags(f);
cctx.forceFlags(f);
}

if (notify) {
Expand Down
Expand Up @@ -1276,15 +1276,15 @@ private static class RemoteRoutineInfo {
* @return Object to send or {@code null} if there is nothing to send for now.
*/
@Nullable Collection<Object> add(@Nullable Object obj) {
Collection<Object> toSnd = null;
ConcurrentLinkedDeque8 buf0 = null;

if (buf.sizex() >= bufSize - 1) {
lock.writeLock().lock();

try {
buf.add(obj);

toSnd = buf;
buf0 = buf;

buf = new ConcurrentLinkedDeque8<>();

Expand All @@ -1306,7 +1306,16 @@ private static class RemoteRoutineInfo {
}
}

return toSnd != null ? new ArrayList<>(toSnd) : null;
Collection<Object> toSnd = null;

if (buf0 != null) {
toSnd = new ArrayList<>(buf0.sizex());

for (Object o : buf0)
toSnd.add(o);
}

return toSnd;
}

/**
Expand Down
Expand Up @@ -174,4 +174,11 @@ public interface GridPortableInputStream extends GridPortableStream {
* @return Remaining data.
*/
public int remaining();

/**
* Length of data inside array.
*
* @param len Length.
*/
public void length(int len);
}
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.lang.*;

import java.util.*;
import java.util.concurrent.*;
Expand All @@ -45,6 +46,7 @@ public class GridCacheContinuousQueryReplicatedSelfTest extends GridCacheContinu
/**
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
public void testRemoteNodeCallback() throws Exception {
GridCache<Integer, Integer> cache1 = grid(0).cache(null);

Expand Down Expand Up @@ -79,4 +81,67 @@ public void testRemoteNodeCallback() throws Exception {

assertEquals(10, val.get().intValue());
}

/**
* Ensure that every node see every update.
*
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
public void testCrossCallback() throws Exception {
// Prepare.
GridCache<Integer, Integer> cache1 = grid(0).cache(null);
GridCache<Integer, Integer> cache2 = grid(1).cache(null);

final int key1 = primaryKey(cache1);
final int key2 = primaryKey(cache2);

final CountDownLatch latch1 = new CountDownLatch(2);
final CountDownLatch latch2 = new CountDownLatch(2);


// Start query on the first node.
CacheContinuousQuery<Integer, Integer> qry1 = cache1.queries().createContinuousQuery();

qry1.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
@Override public boolean apply(UUID nodeID,
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
for (CacheContinuousQueryEntry entry : entries) {
log.info("Update in cache 1: " + entry);

if (entry.getKey() == key1 || entry.getKey() == key2)
latch1.countDown();
}

return latch1.getCount() != 0;
}
});

qry1.execute();

// Start query on the second node.
CacheContinuousQuery<Integer, Integer> qry2 = cache2.queries().createContinuousQuery();

qry2.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
@Override public boolean apply(UUID nodeID,
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
for (CacheContinuousQueryEntry entry : entries) {
log.info("Update in cache 2: " + entry);

if (entry.getKey() == key1 || entry.getKey() == key2)
latch2.countDown();
}

return latch2.getCount() != 0;
}
});

qry2.execute();

cache1.put(key1, key1);
cache1.put(key2, key2);

assert latch1.await(LATCH_TIMEOUT, MILLISECONDS);
assert latch2.await(LATCH_TIMEOUT, MILLISECONDS);
}
}
Expand Up @@ -194,7 +194,7 @@ private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJo
}
});

qry.execute();
qry.execute(ctx.kernalContext().grid().forLocal());

ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
@Override public void onEvent(final IgniteEvent evt) {
Expand Down

0 comments on commit a86ae90

Please sign in to comment.