Skip to content
Browse files

ISPN-1271 - Consistent Hash externalizer loses grouping configuration

As a workaround, skip the consistent hash check on the node receiving state.
  • Loading branch information...
1 parent 34467b8 commit 46c7346245c9b46123d94698d3c6f601abf4dd5e Dan Berindei committed
Showing with 20 additions and 27 deletions.
  1. +20 −27 core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
View
47 core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
@@ -350,31 +350,27 @@ public TransactionLogger getTransactionLogger() {
return transactionLogger;
}
- private Map<Object, InternalCacheValue> applyStateMap(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state, boolean withRetry) {
+ private Map<Object, InternalCacheValue> applyStateMap(Map<Object, InternalCacheValue> state, boolean withRetry) {
Map<Object, InternalCacheValue> retry = withRetry ? new HashMap<Object, InternalCacheValue>() : null;
for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
- if (consistentHash.locate(e.getKey(), configuration.getNumOwners()).contains(getSelf())) {
- InternalCacheValue v = e.getValue();
- InvocationContext ctx = icc.createInvocationContext();
- // locking not necessary in the case of a join since the node isn't doing anything else
- // TODO what if the node is already running?
- ctx.setFlags(CACHE_MODE_LOCAL, SKIP_CACHE_LOAD, SKIP_REMOTE_LOOKUP, SKIP_SHARED_CACHE_STORE, SKIP_LOCKING,
- SKIP_OWNERSHIP_CHECK);
- try {
- PutKeyValueCommand put = cf.buildPutKeyValueCommand(e.getKey(), v.getValue(), v.getLifespan(), v.getMaxIdle(), ctx.getFlags());
- interceptorChain.invoke(ctx, put);
- } catch (Exception ee) {
- if (withRetry) {
- if (trace)
- log.tracef("Problem %s encountered when applying state for key %s. Adding entry to retry queue.", ee.getMessage(), e.getKey());
- retry.put(e.getKey(), e.getValue());
- } else {
- log.problemApplyingStateForKey(ee.getMessage(), e.getKey());
- }
+ InternalCacheValue v = e.getValue();
+ InvocationContext ctx = icc.createInvocationContext();
+ // locking not necessary in the case of a join since the node isn't doing anything else
+ // TODO what if the node is already running?
+ ctx.setFlags(CACHE_MODE_LOCAL, SKIP_CACHE_LOAD, SKIP_REMOTE_LOOKUP, SKIP_SHARED_CACHE_STORE, SKIP_LOCKING,
+ SKIP_OWNERSHIP_CHECK);
+ try {
+ PutKeyValueCommand put = cf.buildPutKeyValueCommand(e.getKey(), v.getValue(), v.getLifespan(), v.getMaxIdle(), ctx.getFlags());
+ interceptorChain.invoke(ctx, put);
+ } catch (Exception ee) {
+ if (withRetry) {
+ if (trace)
+ log.tracef("Problem %s encountered when applying state for key %s. Adding entry to retry queue.", ee.getMessage(), e.getKey());
+ retry.put(e.getKey(), e.getValue());
+ } else {
+ log.problemApplyingStateForKey(ee.getMessage(), e.getKey());
}
- } else {
- log.keyDoesNotMapToLocalNode(e.getKey(), consistentHash.locate(e.getKey(), configuration.getNumOwners()));
}
}
return retry;
@@ -385,12 +381,9 @@ public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheV
Address sender, int viewId) throws InterruptedException {
waitForJoinToStart();
- // use the sender's CH if his view is newer
- // an use our CH if the our view is newer
- ConsistentHash latestCH = consistentHash;
if (viewId < lastViewId) {
log.debugf("Rejecting state pushed by node %s for old rehash %d (last view id is %d)", sender, viewId, lastViewId);
- latestCH = getConsistentHash();
+ return;
}
log.debugf("Applying new state from %s: received %d keys", sender, state.size());
@@ -398,11 +391,11 @@ public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheV
int retryCount = 3; // in case we have issues applying state.
Map<Object, InternalCacheValue> pendingApplications = state;
for (int i = 0; i < retryCount; i++) {
- pendingApplications = applyStateMap(latestCH, pendingApplications, true);
+ pendingApplications = applyStateMap(pendingApplications, true);
if (pendingApplications.isEmpty()) break;
}
// one last go
- if (!pendingApplications.isEmpty()) applyStateMap(latestCH, pendingApplications, false);
+ if (!pendingApplications.isEmpty()) applyStateMap(pendingApplications, false);
if(trace) log.tracef("After applying state data container has %d keys", dataContainer.size());
}

0 comments on commit 46c7346

Please sign in to comment.
Something went wrong with that request. Please try again.