Skip to content

Commit

Permalink
IGNITE-45 - WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Goncharuk committed Mar 4, 2015
1 parent 647691f commit 32e26d3
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 55 deletions.
Expand Up @@ -150,8 +150,6 @@ public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, Discove
if (log.isDebugEnabled())
log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
", discoEvt=" + discoEvt + ']');
U.debug(log, "Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
", discoEvt=" + discoEvt + ']');

GridAffinityAssignment prev = affCache.get(topVer.previous());

Expand All @@ -164,8 +162,6 @@ public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, Discove
// Resolve nodes snapshot for specified topology version.
Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer.topologyVersion());

U.debug(log, "Affinity nodes: " + nodes);

sorted = sort(nodes);
}

Expand All @@ -191,8 +187,6 @@ public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, Discove

GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment);

U.debug(log, "Updated assignment: " + updated);

updated = F.addIfAbsent(affCache, topVer, updated);

// Update top version, if required.
Expand Down
Expand Up @@ -1191,14 +1191,22 @@ public void onCacheStartExchange(DynamicCacheDescriptor startDesc) throws Ignite

startCache(cacheCtx.cache());
onKernalStart(cacheCtx.cache());

caches.put(cacheCtx.name(), cacheCtx.cache());
}

/**
* Callback invoked when first exchange future for dynamic cache is completed.
*
* @param startDesc Cache start descriptor.
*/
@SuppressWarnings("unchecked")
public void onCacheStartFinished(DynamicCacheDescriptor startDesc) {
GridCacheAdapter<?, ?> cache = caches.get(startDesc.cacheConfiguration().getName());

if (cache != null)
jCacheProxies.put(cache.name(), new IgniteCacheProxy(cache.context(), cache, null, false));

CacheConfiguration ccfg = startDesc.cacheConfiguration();

DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName());
Expand Down
Expand Up @@ -221,7 +221,7 @@ private boolean waitForRent() throws IgniteCheckedException {
long updateSeq = this.updateSeq.incrementAndGet();

// If this is the oldest node.
if (oldest.id().equals(loc.id())) {
if (oldest.id().equals(loc.id()) || exchId.isCacheAdded()) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq);

Expand Down
Expand Up @@ -42,4 +42,11 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo
* @throws IgniteCheckedException If topology future failed.
*/
public GridDiscoveryTopologySnapshot topologySnapshot() throws IgniteCheckedException;

/**
* Gets topology version of this future.
*
* @return Topology version.
*/
public AffinityTopologyVersion topologyVersion();
}
Expand Up @@ -323,7 +323,6 @@ public void near(GridNearAtomicCache<K, V> near) {
null,
true,
false,
entry,
filter);
}

Expand All @@ -340,7 +339,6 @@ public void near(GridNearAtomicCache<K, V> near) {
null,
false,
false,
entry,
filter);
}

Expand Down Expand Up @@ -425,7 +423,7 @@ public void near(GridNearAtomicCache<K, V> near) {
if (ctx.portableEnabled())
val = (V)ctx.marshalToPortable(val);

return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsPeekArray(val));
return removeAllAsync0(F.asList(key), null, true, true, ctx.equalsPeekArray(val));
}

/** {@inheritDoc} */
Expand All @@ -441,7 +439,6 @@ public void near(GridNearAtomicCache<K, V> near) {
null,
true,
true,
null,
ctx.equalsPeekArray(oldVal));
}

Expand All @@ -461,7 +458,6 @@ public void near(GridNearAtomicCache<K, V> near) {
null,
false,
false,
null,
filter);
}

Expand All @@ -482,7 +478,6 @@ public void near(GridNearAtomicCache<K, V> near) {
null,
false,
false,
null,
null);
}

Expand All @@ -498,7 +493,7 @@ public void near(GridNearAtomicCache<K, V> near) {
@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
A.notNull(key, "key");

return removeAllAsync0(Collections.singletonList(key), null, entry, true, false, filter);
return removeAllAsync0(Collections.singletonList(key), null, true, false, filter);
}

/** {@inheritDoc} */
Expand All @@ -512,7 +507,7 @@ public void near(GridNearAtomicCache<K, V> near) {
IgnitePredicate<Cache.Entry<K, V>>[] filter) {
A.notNull(keys, "keys");

return removeAllAsync0(keys, null, null, false, false, filter);
return removeAllAsync0(keys, null, false, false, filter);
}

/** {@inheritDoc} */
Expand All @@ -527,7 +522,7 @@ public void near(GridNearAtomicCache<K, V> near) {
@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
A.notNull(key, "key");

return removeAllAsync0(Collections.singletonList(key), null, entry, false, false, filter);
return removeAllAsync0(Collections.singletonList(key), null, false, false, filter);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -565,7 +560,7 @@ public void near(GridNearAtomicCache<K, V> near) {
@Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> conflictMap) {
ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());

return removeAllAsync0(null, conflictMap, null, false, false, null);
return removeAllAsync0(null, conflictMap, false, false, null);
}

/**
Expand Down Expand Up @@ -669,7 +664,6 @@ protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>>
null,
true,
false,
null,
null);

return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
Expand Down Expand Up @@ -713,7 +707,6 @@ protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>>
null,
true,
false,
null,
null);
}

Expand Down Expand Up @@ -743,7 +736,6 @@ protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>>
null,
true,
false,
null,
null);
}

Expand All @@ -757,7 +749,6 @@ protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>>
* @param conflictRmvMap Conflict remove map.
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
* @param cached Cached cache entry for key. May be passed if and only if map size is {@code 1}.
* @param filter Cache entry filter for atomic updates.
* @return Completion future.
*/
Expand All @@ -770,7 +761,6 @@ private IgniteInternalFuture updateAllAsync0(
@Nullable final Map<? extends K, GridCacheVersion> conflictRmvMap,
final boolean retval,
final boolean rawRetval,
@Nullable GridCacheEntryEx<K, V> cached,
@Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter
) {
if (map != null && keyCheck)
Expand All @@ -797,7 +787,6 @@ private IgniteInternalFuture updateAllAsync0(
conflictRmvMap != null ? conflictRmvMap.values() : null,
retval,
rawRetval,
cached,
prj != null ? prj.expiry() : null,
filter,
subjId,
Expand All @@ -817,7 +806,6 @@ private IgniteInternalFuture updateAllAsync0(
*
* @param keys Keys to remove.
* @param conflictMap Conflict map.
* @param cached Cached cache entry for key. May be passed if and only if keys size is {@code 1}.
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
* @param filter Cache entry filter for atomic removes.
Expand All @@ -826,7 +814,6 @@ private IgniteInternalFuture updateAllAsync0(
private IgniteInternalFuture removeAllAsync0(
@Nullable final Collection<? extends K> keys,
@Nullable final Map<? extends K, GridCacheVersion> conflictMap,
@Nullable GridCacheEntryEx<K, V> cached,
final boolean retval,
boolean rawRetval,
@Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter
Expand Down Expand Up @@ -860,7 +847,6 @@ private IgniteInternalFuture removeAllAsync0(
keys != null ? null : conflictMap.values(),
retval,
rawRetval,
cached,
(filter != null && prj != null) ? prj.expiry() : null,
filter,
subjId,
Expand Down Expand Up @@ -2321,7 +2307,6 @@ else if (req.operation() == UPDATE) {
drRmvVals,
req.returnValue(),
false,
null,
req.expiry(),
req.filter(),
req.subjectId(),
Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
Expand Down Expand Up @@ -107,9 +106,6 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
/** Return value require flag. */
private final boolean retval;

/** Cached entry if keys size is 1. */
private GridCacheEntryEx<K, V> cached;

/** Expiry policy. */
private final ExpiryPolicy expiryPlc;

Expand Down Expand Up @@ -179,7 +175,6 @@ public GridNearAtomicUpdateFuture() {
* @param conflictRmvVals Conflict remove values (optional).
* @param retval Return value require flag.
* @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
* @param cached Cached entry if keys size is 1.
* @param expiryPlc Expiry policy explicitly specified for cache operation.
* @param filter Entry filter.
* @param subjId Subject ID.
Expand All @@ -197,7 +192,6 @@ public GridNearAtomicUpdateFuture(
@Nullable Collection<GridCacheVersion> conflictRmvVals,
final boolean retval,
final boolean rawRetval,
@Nullable GridCacheEntryEx<K, V> cached,
@Nullable ExpiryPolicy expiryPlc,
final IgnitePredicate<Cache.Entry<K, V>>[] filter,
UUID subjId,
Expand All @@ -210,7 +204,6 @@ public GridNearAtomicUpdateFuture(
assert vals == null || vals.size() == keys.size();
assert conflictPutVals == null || conflictPutVals.size() == keys.size();
assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
assert cached == null || keys.size() == 1;
assert subjId != null;

this.cctx = cctx;
Expand All @@ -223,7 +216,6 @@ public GridNearAtomicUpdateFuture(
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
this.retval = retval;
this.cached = cached;
this.expiryPlc = expiryPlc;
this.filter = filter;
this.subjId = subjId;
Expand Down Expand Up @@ -431,18 +423,18 @@ private void updateNear(GridNearAtomicUpdateRequest<K, V> req, GridNearAtomicUpd
private void mapOnTopology(final Collection<? extends K> keys, final boolean remap, final UUID oldNodeId) {
cache.topology().readLock();

GridDiscoveryTopologySnapshot snapshot = null;
AffinityTopologyVersion topVer = null;

try {
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();

if (fut.isDone()) {
topVer = fut.topologyVersion();

if (futVer == null)
// Assign future version in topology read lock before first exception may be thrown.
futVer = cctx.versions().next(topVer);

// We are holding topology read lock and current topology is ready, we can start mapping.
snapshot = fut.topologySnapshot();
}
else {
fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
Expand All @@ -454,23 +446,16 @@ private void mapOnTopology(final Collection<? extends K> keys, final boolean rem
return;
}

topVer = new AffinityTopologyVersion(snapshot.topologyVersion());

mapTime = U.currentTimeMillis();

if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC))
cctx.mvcc().addAtomicFuture(version(), this);
}
catch (IgniteCheckedException e) {
onDone(new IgniteCheckedException("Failed to get topology snapshot for update operation: " + this, e));

return;
}
finally {
cache.topology().readUnlock();
}

map0(snapshot, keys, remap, oldNodeId);
map0(topVer, keys, remap, oldNodeId);
}

/**
Expand All @@ -488,19 +473,17 @@ private synchronized void checkComplete() {
}

/**
* @param topSnapshot Topology snapshot to map on.
* @param keys Keys to map.
* @param remap Flag indicating if this is partial remap for this future.
* @param oldNodeId Old node ID if was remap.
*/
private void map0(GridDiscoveryTopologySnapshot topSnapshot,
private void map0(
AffinityTopologyVersion topVer,
Collection<? extends K> keys,
boolean remap,
@Nullable UUID oldNodeId) {
assert oldNodeId == null || remap;

AffinityTopologyVersion topVer = new AffinityTopologyVersion(topSnapshot.topologyVersion());

Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);

if (F.isEmpty(topNodes)) {
Expand Down
Expand Up @@ -261,6 +261,11 @@ public GridDhtPartitionsExchangeFuture() {
return topSnapshot.get();
}

/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
return exchId.topologyVersion();
}

/**
* @return Dummy flag.
*/
Expand Down Expand Up @@ -662,11 +667,14 @@ private boolean spreadPartitions() {

/** {@inheritDoc} */
@Override public boolean onDone(AffinityTopologyVersion res, Throwable err) {
if (err == null) {
for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
if (err == null) {
if (!cacheCtx.isLocal())
cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 10);
}

if (startDesc != null && F.eq(startDesc.cacheConfiguration().getName(), cacheCtx.name()))
cacheCtx.preloader().onInitialExchangeComplete(err);
}

cctx.exchange().onExchangeDone(this);
Expand Down

0 comments on commit 32e26d3

Please sign in to comment.