Skip to content

Commit

Permalink
# IGNITE-283: Fixed DR start issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
vozerov committed Feb 19, 2015
1 parent c93d86f commit 47fa3ce
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 28 deletions.
Expand Up @@ -320,7 +320,12 @@ public GridCacheContext(
expiryPlc = null;

itHolder = new CacheWeakQueryIteratorsHolder(log);
}

/**
* Initialize conflict resolver after all managers are started.
*/
void initConflictResolver() {
// Conflict resolver is determined in two stages:
// 1. If DR receiver hub is enabled, then pick it from DR manager.
// 2. Otherwise instantiate default resolver in case local store is configured.
Expand Down
Expand Up @@ -854,13 +854,17 @@ private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near
for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
mgr.start(cacheCtx);

cacheCtx.initConflictResolver();

if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) {
GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context();

// Start DHT managers.
for (GridCacheManager mgr : dhtManagers(dhtCtx))
mgr.start(dhtCtx);

dhtCtx.initConflictResolver();

// Start DHT cache.
dhtCtx.cache().start();

Expand Down
Expand Up @@ -509,17 +509,16 @@ private void commitIfLocked() throws IgniteCheckedException {
if (txEntry.ttl() == CU.TTL_ZERO)
op = DELETE;


boolean drNeedResolve = cacheCtx.conflictNeedResolve();

if (drNeedResolve) {
IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>>
drRes = conflictResolve(op, txEntry.key(), val, valBytes,
txEntry.ttl(), txEntry.conflictExpireTime(), explicitVer, cached);
if (drNeedResolve) {
IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>>
drRes = conflictResolve(op, txEntry.key(), val, valBytes,
txEntry.ttl(), txEntry.conflictExpireTime(), explicitVer, cached);

assert drRes != null;

GridCacheVersionConflictContext<K, V> drCtx = drRes.get2();
GridCacheVersionConflictContext<K, V> drCtx = drRes.get2();

if (drCtx.isUseOld())
op = NOOP;
Expand Down
Expand Up @@ -85,13 +85,13 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
/** Optional arguments for entry processor. */
private Object[] invokeArgs;

/** DR put values. */
/** Conflict put values. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheDrInfo<V>> drPutVals;
private Collection<GridCacheDrInfo<V>> conflictPutVals;

/** DR remove values. */
/** Conflict remove values. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheVersion> drRmvVals;
private Collection<GridCacheVersion> conflictRmvVals;

/** Mappings. */
@GridToStringInclude
Expand Down Expand Up @@ -174,8 +174,8 @@ public GridNearAtomicUpdateFuture() {
* @param keys Keys to update.
* @param vals Values or transform closure.
* @param invokeArgs Optional arguments for entry processor.
* @param drPutVals DR put values (optional).
* @param drRmvVals DR remove values (optional).
* @param conflictPutVals Conflict put values (optional).
* @param conflictRmvVals Conflict remove values (optional).
* @param retval Return value require flag.
* @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
* @param cached Cached entry if keys size is 1.
Expand All @@ -192,8 +192,8 @@ public GridNearAtomicUpdateFuture(
Collection<? extends K> keys,
@Nullable Collection<?> vals,
@Nullable Object[] invokeArgs,
@Nullable Collection<GridCacheDrInfo<V>> drPutVals,
@Nullable Collection<GridCacheVersion> drRmvVals,
@Nullable Collection<GridCacheDrInfo<V>> conflictPutVals,
@Nullable Collection<GridCacheVersion> conflictRmvVals,
final boolean retval,
final boolean rawRetval,
@Nullable GridCacheEntryEx<K, V> cached,
Expand All @@ -207,8 +207,8 @@ public GridNearAtomicUpdateFuture(
this.rawRetval = rawRetval;

assert vals == null || vals.size() == keys.size();
assert drPutVals == null || drPutVals.size() == keys.size();
assert drRmvVals == null || drRmvVals.size() == keys.size();
assert conflictPutVals == null || conflictPutVals.size() == keys.size();
assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
assert cached == null || keys.size() == 1;
assert subjId != null;

Expand All @@ -219,8 +219,8 @@ public GridNearAtomicUpdateFuture(
this.keys = keys;
this.vals = vals;
this.invokeArgs = invokeArgs;
this.drPutVals = drPutVals;
this.drRmvVals = drRmvVals;
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
this.retval = retval;
this.cached = cached;
this.expiryPlc = expiryPlc;
Expand Down Expand Up @@ -530,19 +530,19 @@ private void map0(GridDiscoveryTopologySnapshot topSnapshot,
drExpireTime = -1;
drVer = null;
}
else if (drPutVals != null) {
GridCacheDrInfo<V> drPutVal = F.first(drPutVals);
else if (conflictPutVals != null) {
GridCacheDrInfo<V> drPutVal = F.first(conflictPutVals);

val = drPutVal.value();
drTtl = drPutVal.ttl();
drExpireTime = drPutVal.expireTime();
drVer = drPutVal.version();
}
else if (drRmvVals != null) {
else if (conflictRmvVals != null) {
val = null;
drTtl = -1;
drExpireTime = -1;
drVer = F.first(drRmvVals);
drVer = F.first(conflictRmvVals);
}
else {
val = null;
Expand Down Expand Up @@ -616,13 +616,13 @@ else if (drRmvVals != null) {

Iterator<GridCacheDrInfo<V>> drPutValsIt = null;

if (drPutVals != null)
drPutValsIt = drPutVals.iterator();
if (conflictPutVals != null)
drPutValsIt = conflictPutVals.iterator();

Iterator<GridCacheVersion> drRmvValsIt = null;

if (drRmvVals != null)
drRmvValsIt = drRmvVals.iterator();
if (conflictRmvVals != null)
drRmvValsIt = conflictRmvVals.iterator();

Map<UUID, GridNearAtomicUpdateRequest<K, V>> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);

Expand Down Expand Up @@ -661,15 +661,15 @@ else if (drRmvVals != null) {
throw err;
}
}
else if (drPutVals != null) {
else if (conflictPutVals != null) {
GridCacheDrInfo<V> drPutVal = drPutValsIt.next();

val = drPutVal.value();
drTtl = drPutVal.ttl();
drExpireTime = drPutVal.expireTime();
drVer = drPutVal.version();
}
else if (drRmvVals != null) {
else if (conflictRmvVals != null) {
val = null;
drTtl = -1;
drExpireTime = -1;
Expand Down

0 comments on commit 47fa3ce

Please sign in to comment.