Skip to content

Commit

Permalink
#IGNITE-99: Add different cache affinity functions to tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
ivasilinets committed Jan 27, 2015
1 parent 16298c5 commit ca53075
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 122 deletions.
Expand Up @@ -103,7 +103,7 @@ public List<ClusterNode> get(int part) {
public Set<Integer> primaryPartitions(UUID nodeId) { public Set<Integer> primaryPartitions(UUID nodeId) {
Set<Integer> set = primary.get(nodeId); Set<Integer> set = primary.get(nodeId);


return set == null ? Collections.<Integer>emptySet() : Collections.unmodifiableSet(set); return set == null ? Collections.<Integer>emptySet() : set;
} }


/** /**
Expand All @@ -115,7 +115,7 @@ public Set<Integer> primaryPartitions(UUID nodeId) {
public Set<Integer> backupPartitions(UUID nodeId) { public Set<Integer> backupPartitions(UUID nodeId) {
Set<Integer> set = backup.get(nodeId); Set<Integer> set = backup.get(nodeId);


return set == null ? Collections.<Integer>emptySet() : Collections.unmodifiableSet(set); return set == null ? Collections.<Integer>emptySet() : set;
} }


/** /**
Expand Down
Expand Up @@ -77,7 +77,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {


// Clean up affinity functions if such cache no more exists. // Clean up affinity functions if such cache no more exists.
if (evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) { if (evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) {
Collection<String> caches = new HashSet<>(); final Collection<String> caches = new HashSet<>();


for (ClusterNode clusterNode : ((IgniteDiscoveryEvent)evt).topologyNodes()) for (ClusterNode clusterNode : ((IgniteDiscoveryEvent)evt).topologyNodes())
caches.addAll(U.cacheNames(clusterNode)); caches.addAll(U.cacheNames(clusterNode));
Expand Down Expand Up @@ -495,6 +495,20 @@ private AffinityInfo(CacheAffinityFunction affFunc, CacheAffinityKeyMapper mappe
this.portableEnabled = portableEnabled; this.portableEnabled = portableEnabled;
} }


/**
* @return Cache affinity function.
*/
private CacheAffinityFunction affFunc() {
return affFunc;
}

/**
* @return Affinity assignment.
*/
private GridAffinityAssignment assignment() {
return assignment;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public String toString() { @Override public String toString() {
return S.toString(AffinityInfo.class, this); return S.toString(AffinityInfo.class, this);
Expand Down Expand Up @@ -567,7 +581,7 @@ public CacheAffinityProxy(String cacheName) {
ctx.gateway().readLock(); ctx.gateway().readLock();


try { try {
return cache().affFunc.partitions(); return cache().affFunc().partitions();
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
throw new IgniteException(e); throw new IgniteException(e);
Expand All @@ -582,7 +596,7 @@ public CacheAffinityProxy(String cacheName) {
ctx.gateway().readLock(); ctx.gateway().readLock();


try { try {
return cache().affFunc.partition(key); return cache().affFunc().partition(key);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
throw new IgniteException(e); throw new IgniteException(e);
Expand All @@ -597,8 +611,7 @@ public CacheAffinityProxy(String cacheName) {
ctx.gateway().readLock(); ctx.gateway().readLock();


try { try {
return cache() return cache().assignment().primaryPartitions(n.id()).contains(partition(key));
.assignment.primaryPartitions(n.id()).contains(partition(key));
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
throw new IgniteException(e); throw new IgniteException(e);
Expand All @@ -608,17 +621,12 @@ public CacheAffinityProxy(String cacheName) {
} }
} }


private AffinityInfo cache() throws IgniteCheckedException {
return affinityCache(cacheName, topologyVersion());
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean isBackup(ClusterNode n, K key) { @Override public boolean isBackup(ClusterNode n, K key) {
ctx.gateway().readLock(); ctx.gateway().readLock();


try { try {
return cache() return cache().assignment().backupPartitions(n.id()).contains(partition(key));
.assignment.backupPartitions(n.id()).contains(partition(key));
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
throw new IgniteException(e); throw new IgniteException(e);
Expand All @@ -645,7 +653,7 @@ private AffinityInfo cache() throws IgniteCheckedException {
ctx.gateway().readLock(); ctx.gateway().readLock();


try { try {
Set<Integer> parts = cache().assignment.primaryPartitions(n.id()); Set<Integer> parts = cache().assignment().primaryPartitions(n.id());


return U.toIntArray(parts); return U.toIntArray(parts);
} }
Expand All @@ -662,7 +670,7 @@ private AffinityInfo cache() throws IgniteCheckedException {
ctx.gateway().readLock(); ctx.gateway().readLock();


try { try {
Set<Integer> parts = cache().assignment.backupPartitions(n.id()); Set<Integer> parts = cache().assignment().backupPartitions(n.id());


return U.toIntArray(parts); return U.toIntArray(parts);
} }
Expand All @@ -679,21 +687,12 @@ private AffinityInfo cache() throws IgniteCheckedException {
ctx.gateway().readLock(); ctx.gateway().readLock();


try { try {
Collection<Integer> parts = new HashSet<>(); GridAffinityAssignment assignment = cache().assignment();

AffinityInfo affInfo = cache();

for (int partsCnt = affInfo.affFunc.partitions(), part = 0; part < partsCnt; part++) {
for (ClusterNode affNode : affInfo.assignment.get(part)) {
if (n.id().equals(affNode.id())) {
parts.add(part);


break; int[] primary = U.toIntArray(assignment.primaryPartitions(n.id()));
} int[] backup = U.toIntArray(assignment.backupPartitions(n.id()));
}
}


return U.toIntArray(parts); return U.addAll(primary, backup);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
throw new IgniteException(e); throw new IgniteException(e);
Expand Down Expand Up @@ -769,7 +768,7 @@ private AffinityInfo cache() throws IgniteCheckedException {
ctx.gateway().readLock(); ctx.gateway().readLock();


try { try {
return F.first(cache().assignment.get(part)); return F.first(cache().assignment().get(part));
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
throw new IgniteException(e); throw new IgniteException(e);
Expand Down Expand Up @@ -803,7 +802,7 @@ private AffinityInfo cache() throws IgniteCheckedException {
ctx.gateway().readLock(); ctx.gateway().readLock();


try { try {
return cache().assignment.get(part); return cache().assignment().get(part);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
throw new IgniteException(e); throw new IgniteException(e);
Expand All @@ -813,6 +812,13 @@ private AffinityInfo cache() throws IgniteCheckedException {
} }
} }


/**
* @return Affinity info for current topology version.
*/
private AffinityInfo cache() throws IgniteCheckedException {
return affinityCache(cacheName, topologyVersion());
}

/** /**
* @return Topology version. * @return Topology version.
*/ */
Expand Down
Expand Up @@ -6245,6 +6245,15 @@ public static int[] toIntArray(@Nullable Collection<Integer> c) {
return arr; return arr;
} }


public static int[] addAll(int[] arr1, int[] arr2) {
int[] all = new int[arr1.length + arr2.length];

System.arraycopy(arr1, 0, all, 0, arr1.length);
System.arraycopy(arr2, 0, all, arr1.length, arr2.length);

return all;
}

/** /**
* Converts array of integers into list. * Converts array of integers into list.
* *
Expand Down
Expand Up @@ -2,6 +2,9 @@


import org.apache.ignite.cache.*; import org.apache.ignite.cache.*;
import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.cache.affinity.consistenthash.*;
import org.apache.ignite.cache.affinity.fair.*;
import org.apache.ignite.cache.affinity.rendezvous.*;
import org.apache.ignite.cluster.*; import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*; import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.*;
Expand All @@ -12,12 +15,18 @@
/** /**
* Tests for {@link org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.CacheAffinityProxy}. * Tests for {@link org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.CacheAffinityProxy}.
*/ */
public abstract class IgniteCacheAffinityAbstractTest extends IgniteCacheAbstractTest { public class IgniteCacheAffinityTest extends IgniteCacheAbstractTest {
/** Initial grid count. */ /** Initial grid count. */
private int GRID_COUNT = 3; private int GRID_COUNT = 3;


/** Cache name */ /** Cache name */
private final String CACHE1 = "cache1"; private final String CACHE1 = "ConsistentCache";

/** Cache name */
private final String CACHE2 = "PartitionFairCache";

/** Cache name */
private final String CACHE3 = "RendezvousCache";


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override protected int gridCount() { @Override protected int gridCount() {
Expand All @@ -32,22 +41,39 @@ public abstract class IgniteCacheAffinityAbstractTest extends IgniteCacheAbstrac


CacheConfiguration cache1 = cacheConfiguration(null); CacheConfiguration cache1 = cacheConfiguration(null);
cache1.setName(CACHE1); cache1.setName(CACHE1);
cache1.setAffinity(new CacheConsistentHashAffinityFunction());

CacheConfiguration cache2 = cacheConfiguration(null);
cache2.setName(CACHE2);
cache2.setAffinity(new CachePartitionFairAffinity());

CacheConfiguration cache3 = cacheConfiguration(null);
cache3.setName(CACHE3);
cache3.setAffinity(new CacheRendezvousAffinityFunction());


if (gridName.contains("0")) if (gridName.contains("0"))
cfg.setCacheConfiguration();
else if (gridName.contains("1"))
cfg.setCacheConfiguration(cache0); cfg.setCacheConfiguration(cache0);
else else
cfg.setCacheConfiguration(cache0, cache1); cfg.setCacheConfiguration(cache0, cache1, cache2, cache3);


return cfg; return cfg;
} }


/** {@inheritDoc} */
@Override protected CacheMode cacheMode() {
return CacheMode.PARTITIONED;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() { @Override protected CacheAtomicityMode atomicityMode() {
return CacheAtomicityMode.TRANSACTIONAL; return CacheAtomicityMode.TRANSACTIONAL;
} }


/** {@inheritDoc} */
@Override protected CacheDistributionMode distributionMode() {
return CacheDistributionMode.NEAR_PARTITIONED;
}

/** /**
* Throws Exception if failed. * Throws Exception if failed.
*/ */
Expand All @@ -59,20 +85,30 @@ public void testAffinity() throws Exception {
} }


GridCache<String, Integer> cache0 = grid(1).cache(null); GridCache<String, Integer> cache0 = grid(1).cache(null);
GridCache<String, Integer> cache1 = grid(2).cache(CACHE1); GridCache<String, Integer> cache1 = grid(1).cache(CACHE1);
GridCache<String, Integer> cache2 = grid(1).cache(CACHE2);
GridCache<String, Integer> cache3 = grid(1).cache(CACHE3);


for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i)
cache0.put(Integer.toString(i), i); cache0.put(Integer.toString(i), i);


for (int i = 10; i < 20; ++i) for (int i = 10; i < 20; ++i)
cache1.put(Integer.toString(i), i); cache1.put(Integer.toString(i), i);


for (int i = 20; i < 30; ++i)
cache2.put(Integer.toString(i), i);

for (int i = 30; i < 40; ++i)
cache3.put(Integer.toString(i), i);

checkAffinity(); checkAffinity();


stopGrid(gridCount() - 1);

startGrid(gridCount() - 1);
startGrid(gridCount()); startGrid(gridCount());
startGrid(gridCount() + 1);


GRID_COUNT += 2; GRID_COUNT += 1;


checkAffinity(); checkAffinity();
} }
Expand All @@ -81,31 +117,21 @@ public void testAffinity() throws Exception {
* Check CacheAffinityProxy methods. * Check CacheAffinityProxy methods.
*/ */
private void checkAffinity() { private void checkAffinity() {
for (int i = 0; i < gridCount(); ++i) { checkGridAffinity(grid(0).affinity(null), grid(1).jcache(null), grid(1).cache(null));
if (grid(i).cachex(null) != null)
checkAffinity(grid(i).<String, Integer>jcache(null), grid(i).<String, Integer>cache(null));


if (grid(i).cachex(CACHE1) != null) checkGridAffinity(grid(0).affinity(CACHE1), grid(1).jcache(CACHE1), grid(1).cache(CACHE1));
checkAffinity(grid(i).<String, Integer>jcache(CACHE1), grid(i).<String, Integer>cache(CACHE1));
}
}


/** checkGridAffinity(grid(0).affinity(CACHE2), grid(1).jcache(CACHE2), grid(1).cache(CACHE2));
* @param jcache Jcache to iterate over.
* @param cache Cache to check. checkGridAffinity(grid(0).affinity(CACHE3), grid(1).jcache(CACHE3), grid(1).cache(CACHE3));
*/
private void checkAffinity(IgniteCache<String, Integer> jcache, GridCache<String, Integer> cache) {
for (int i = 0; i < gridCount(); ++i)
checkGridAffinity(grid(i).<String>affinity(cache.name()), jcache, cache);
} }


/** /**
* @param testAff Cache affinity to test. * @param testAff Cache affinity to test.
* @param jcache Ignite cache. * @param jcache Ignite cache.
* @param cache Cache. * @param cache Cache.
*/ */
private void checkGridAffinity(CacheAffinity<String> testAff, IgniteCache<String, Integer> jcache, private void checkGridAffinity(CacheAffinity testAff, IgniteCache jcache, GridCache cache) {
GridCache<String, Integer> cache) {
checkAffinityKey(testAff, jcache, cache.affinity()); checkAffinityKey(testAff, jcache, cache.affinity());


checkPartitions(testAff, cache.affinity()); checkPartitions(testAff, cache.affinity());
Expand All @@ -118,12 +144,11 @@ private void checkGridAffinity(CacheAffinity<String> testAff, IgniteCache<String
/** /**
* Check mapKeyToNode, mapKeyToPrimaryAndBackups and mapPartitionToNode methods. * Check mapKeyToNode, mapKeyToPrimaryAndBackups and mapPartitionToNode methods.
*/ */
private void checkMapKeyToNode(CacheAffinity<String> testAff, private void checkMapKeyToNode(CacheAffinity testAff, IgniteCache jcache, CacheAffinity aff) {
IgniteCache<String, Integer> jcache, CacheAffinity<String> aff) { Iterator<Cache.Entry> iter = jcache.iterator();
Iterator<Cache.Entry<String, Integer>> iter = jcache.iterator();


while (iter.hasNext()) { while (iter.hasNext()) {
Cache.Entry<String, Integer> entry = iter.next(); Cache.Entry entry = iter.next();


UUID node1 = testAff.mapKeyToNode(entry.getKey()).id(); UUID node1 = testAff.mapKeyToNode(entry.getKey()).id();
UUID node2 = aff.mapKeyToNode(entry.getKey()).id(); UUID node2 = aff.mapKeyToNode(entry.getKey()).id();
Expand All @@ -144,12 +169,11 @@ private void checkMapKeyToNode(CacheAffinity<String> testAff,
/** /**
* Check affinityKey method. * Check affinityKey method.
*/ */
private void checkAffinityKey(CacheAffinity<String> testAff, private void checkAffinityKey(CacheAffinity testAff, IgniteCache jcache, CacheAffinity aff) {
IgniteCache<String, Integer> jcache, CacheAffinity<String> aff) { Iterator<Cache.Entry> iter = jcache.iterator();
Iterator<Cache.Entry<String, Integer>> iter = jcache.iterator();


while (iter.hasNext()) { while (iter.hasNext()) {
Cache.Entry<String, Integer> entry = iter.next(); Cache.Entry entry = iter.next();


assertEquals(testAff.affinityKey(entry.getKey()), (aff.affinityKey(entry.getKey()))); assertEquals(testAff.affinityKey(entry.getKey()), (aff.affinityKey(entry.getKey())));
} }
Expand All @@ -158,13 +182,12 @@ private void checkAffinityKey(CacheAffinity<String> testAff,
/** /**
* Check isBackup, isPrimary and isPrimaryOrBackup methods. * Check isBackup, isPrimary and isPrimaryOrBackup methods.
*/ */
private void checkIsBackupOrPrimary(CacheAffinity<String> testAff, IgniteCache<String, Integer> jcache, private void checkIsBackupOrPrimary(CacheAffinity testAff, IgniteCache jcache, CacheAffinity aff) {
CacheAffinity<String> aff) {


Iterator<Cache.Entry<String, Integer>> iter = jcache.iterator(); Iterator<Cache.Entry> iter = jcache.iterator();


while (iter.hasNext()) { while (iter.hasNext()) {
Cache.Entry<String, Integer> entry = iter.next(); Cache.Entry entry = iter.next();


for (ClusterNode n : nodes()) { for (ClusterNode n : nodes()) {
assertEquals(testAff.isBackup(n, entry.getKey()), aff.isBackup(n, entry.getKey())); assertEquals(testAff.isBackup(n, entry.getKey()), aff.isBackup(n, entry.getKey()));
Expand All @@ -179,7 +202,7 @@ private void checkIsBackupOrPrimary(CacheAffinity<String> testAff, IgniteCache<S
/** /**
* Check allPartitions, backupPartitions and primaryPartitions methods. * Check allPartitions, backupPartitions and primaryPartitions methods.
*/ */
private void checkPartitions(CacheAffinity<String> testAff, CacheAffinity<String> aff) { private void checkPartitions(CacheAffinity testAff, CacheAffinity aff) {
for (ClusterNode n : nodes()) { for (ClusterNode n : nodes()) {
checkEqualIntArray(testAff.allPartitions(n), aff.allPartitions(n)); checkEqualIntArray(testAff.allPartitions(n), aff.allPartitions(n));


Expand Down Expand Up @@ -207,7 +230,6 @@ private void checkEqualIntArray(int[] arr1, int[] arr2) {
} }


assertEquals(0, col1.size()); assertEquals(0, col1.size());

} }


/** /**
Expand Down

0 comments on commit ca53075

Please sign in to comment.