Skip to content

Commit

Permalink
ignite-5075 Implement logical 'cache groups' sharing the same physica…
Browse files Browse the repository at this point in the history
…l caches
  • Loading branch information
sboikov committed Jun 6, 2017
1 parent eaf8e14 commit db85d16
Show file tree
Hide file tree
Showing 249 changed files with 13,546 additions and 3,450 deletions.
Expand Up @@ -202,6 +202,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Cache name. */
private String name;

/** Cache group name. */
private String grpName;

/** Name of {@link MemoryPolicyConfiguration} for this cache */
private String memPlcName;

Expand Down Expand Up @@ -408,6 +411,7 @@ public CacheConfiguration(CompleteConfiguration<K, V> cfg) {
evictFilter = cc.getEvictionFilter();
evictPlc = cc.getEvictionPolicy();
expiryPolicyFactory = cc.getExpiryPolicyFactory();
grpName = cc.getGroupName();
indexedTypes = cc.getIndexedTypes();
interceptor = cc.getInterceptor();
invalidate = cc.isInvalidate();
Expand Down Expand Up @@ -454,6 +458,23 @@ public CacheConfiguration(CompleteConfiguration<K, V> cfg) {
writeSync = cc.getWriteSynchronizationMode();
}

/**
* @return Cache group name.
*/
public String getGroupName() {
return grpName;
}

/**
* @param grpName Cache group name.
* @return {@code this} for chaining.
*/
public CacheConfiguration<K, V> setGroupName(String grpName) {
this.grpName = grpName;

return this;
}

/**
* Cache name or {@code null} if not provided, then this will be considered a default
* cache which can be accessed via {@link Ignite#cache(String)} method. Otherwise, if name
Expand Down
Expand Up @@ -1153,10 +1153,14 @@ private void processRegularMessage(
pools.poolForPolicy(plc).execute(c);
}
catch (RejectedExecutionException e) {
U.error(log, "Failed to process regular message due to execution rejection. Will attempt to process " +
"message in the listener thread instead.", e);
if (!ctx.isStopping()) {
U.error(log, "Failed to process regular message due to execution rejection. Will attempt to process " +
"message in the listener thread instead.", e);

c.run();
c.run();
}
else if (log.isDebugEnabled())
log.debug("Failed to process regular message due to execution rejection: " + msg);
}
}

Expand Down
Expand Up @@ -69,7 +69,7 @@ public class DiscoCache {

/** Affinity cache nodes by cache name. */
@GridToStringInclude
private final Map<Integer, List<ClusterNode>> affCacheNodes;
private final Map<Integer, List<ClusterNode>> cacheGrpAffNodes;

/** Node map. */
private final Map<UUID, ClusterNode> nodeMap;
Expand All @@ -91,7 +91,7 @@ public class DiscoCache {
* @param allNodesWithCaches All nodes with at least one cache configured.
* @param rmtNodesWithCaches Remote nodes with at least one cache configured.
* @param allCacheNodes Cache nodes by cache name.
* @param affCacheNodes Affinity cache nodes by cache name.
* @param cacheGrpAffNodes Affinity nodes by cache group ID.
* @param nodeMap Node map.
* @param nearEnabledCaches Caches where at least one node has near cache enabled.
* @param alives Alive nodes.
Expand All @@ -105,7 +105,7 @@ public class DiscoCache {
List<ClusterNode> allNodesWithCaches,
List<ClusterNode> rmtNodesWithCaches,
Map<Integer, List<ClusterNode>> allCacheNodes,
Map<Integer, List<ClusterNode>> affCacheNodes,
Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
Map<UUID, ClusterNode> nodeMap,
Set<Integer> nearEnabledCaches,
Set<UUID> alives) {
Expand All @@ -118,7 +118,7 @@ public class DiscoCache {
this.allNodesWithCaches = allNodesWithCaches;
this.rmtNodesWithCaches = rmtNodesWithCaches;
this.allCacheNodes = allCacheNodes;
this.affCacheNodes = affCacheNodes;
this.cacheGrpAffNodes = cacheGrpAffNodes;
this.nodeMap = nodeMap;
this.nearEnabledCaches = nearEnabledCaches;
this.alives.addAll(alives);
Expand Down Expand Up @@ -235,25 +235,11 @@ public List<ClusterNode> cacheNodes(Integer cacheId) {
}

/**
* Gets all nodes that have cache with given ID and should participate in affinity calculation. With
* partitioned cache nodes with near-only cache do not participate in affinity node calculation.
*
* @param cacheName Cache name.
* @return Collection of nodes.
*/
public List<ClusterNode> cacheAffinityNodes(@Nullable String cacheName) {
return cacheAffinityNodes(CU.cacheId(cacheName));
}

/**
* Gets all nodes that have cache with given ID and should participate in affinity calculation. With
* partitioned cache nodes with near-only cache do not participate in affinity node calculation.
*
* @param cacheId Cache ID.
* @return Collection of nodes.
* @param grpId Cache group ID.
* @return All nodes that participate in affinity calculation.
*/
public List<ClusterNode> cacheAffinityNodes(int cacheId) {
return emptyIfNull(affCacheNodes.get(cacheId));
public List<ClusterNode> cacheGroupAffinityNodes(int grpId) {
return emptyIfNull(cacheGrpAffNodes.get(grpId));
}

/**
Expand Down

0 comments on commit db85d16

Please sign in to comment.