Skip to content

Commit

Permalink
IGNITE-45 - WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Goncharuk committed Mar 4, 2015
1 parent 55a9c50 commit 30d96ad
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 56 deletions.
Expand Up @@ -166,6 +166,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Custom event listener. */
private GridPlainInClosure<Serializable> customEvtLsnr;

/** Map of dynamic cache filters. */
private Map<String, IgnitePredicate<ClusterNode>> dynamicCacheFilters = new HashMap<>();

/** @param ctx Context. */
public GridDiscoveryManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDiscoverySpi());
Expand Down Expand Up @@ -214,6 +217,18 @@ public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ve
getSpi().setNodeAttributes(attrs, ver);
}

/**
* Adds dynamic cache filters.
*
* @param cacheName Cache name.
* @param filter Cache filter.
*/
public void addDynamicCacheFilter(String cacheName, IgnitePredicate<ClusterNode> filter) {
IgnitePredicate<ClusterNode> old = dynamicCacheFilters.put(cacheName, filter);

assert old == null;
}

/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
discoOrdered = discoOrdered();
Expand Down Expand Up @@ -277,10 +292,19 @@ public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ve
c.updateAlives(node);
}

if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
try {
customEvtLsnr.apply(data);
}
catch (Exception e) {
U.error(log, "Failed to notify direct custom event listener: " + data, e);
}
}

// Put topology snapshot into discovery history.
// There is no race possible between history maintenance and concurrent discovery
// event notifications, since SPI notifies manager about all events from this listener.
if (type != EVT_NODE_METRICS_UPDATED && type != DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
if (type != EVT_NODE_METRICS_UPDATED) {
DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id())));

discoCacheHist.put(topVer, cache);
Expand All @@ -307,15 +331,6 @@ public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ve
return;
}

if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
try {
customEvtLsnr.apply(data);
}
catch (Exception e) {
U.error(log, "Failed to notify direct custom event listener: " + data, e);
}
}

if (topVer > 0 && (type == EVT_NODE_JOINED || type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)) {
boolean set = GridDiscoveryManager.this.topVer.setIfGreater(topVer);

Expand Down Expand Up @@ -1834,6 +1849,8 @@ private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) {

GridCacheAttributes[] caches = node.attribute(ATTR_CACHE);

boolean hasCaches = false;

if (caches != null) {
nodesWithCaches.add(node);

Expand All @@ -1860,6 +1877,35 @@ private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) {
}
}

hasCaches = true;
}

for (Map.Entry<String, IgnitePredicate<ClusterNode>> entry : dynamicCacheFilters.entrySet()) {
String cacheName = entry.getKey();
IgnitePredicate<ClusterNode> filter = entry.getValue();

if (filter.apply(node)) {
addToMap(cacheMap, cacheName, node);

if (alive(node.id()))
addToMap(aliveCacheNodes, maskNull(cacheName), node);

addToMap(dhtNodesMap, cacheName, node);

// TODO IGNITE-45 client and near caches.

if (!loc.id().equals(node.id())) {
addToMap(rmtCacheMap, cacheName, node);

if (alive(node.id()))
addToMap(aliveRmtCacheNodes, maskNull(cacheName), node);
}

hasCaches = true;
}
}

if (hasCaches) {
if (alive(node.id())) {
aliveNodesWithCaches.add(node);

Expand Down
Expand Up @@ -89,17 +89,34 @@ public AffinityTopologyVersion previous() {

/** {@inheritDoc} */
@Override public int compareTo(AffinityTopologyVersion o) {
return Long.compare(topVer, o.topVer);
int cmp = Long.compare(topVer, o.topVer);

if (cmp == 0)
return Integer.compare(minorTopVer, o.minorTopVer);

return cmp;
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
return o instanceof AffinityTopologyVersion && topVer == ((AffinityTopologyVersion)o).topVer;
if (this == o)
return true;

if (!(o instanceof AffinityTopologyVersion))
return false;

AffinityTopologyVersion that = (AffinityTopologyVersion)o;

return minorTopVer == that.minorTopVer && topVer == that.topVer;
}

/** {@inheritDoc} */
@Override public int hashCode() {
return (int)topVer;
int result = (int)(topVer ^ (topVer >>> 32));

result = 31 * result + minorTopVer;

return result;
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -150,6 +150,8 @@ public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, Discove
if (log.isDebugEnabled())
log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
", discoEvt=" + discoEvt + ']');
U.debug(log, "Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
", discoEvt=" + discoEvt + ']');

GridAffinityAssignment prev = affCache.get(topVer.previous());

Expand All @@ -162,6 +164,8 @@ public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, Discove
// Resolve nodes snapshot for specified topology version.
Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer.topologyVersion());

U.debug(log, "Affinity nodes: " + nodes);

sorted = sort(nodes);
}

Expand All @@ -187,6 +191,8 @@ public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, Discove

GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment);

U.debug(log, "Updated assignment: " + updated);

updated = F.addIfAbsent(affCache, topVer, updated);

// Update top version, if required.
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
Expand All @@ -28,27 +29,34 @@
* Cache start descriptor.
*/
public class DynamicCacheDescriptor implements Serializable {
/** Cache start ID. */
private IgniteUuid startId;

/** Cache configuration. */
@GridToStringExclude
private CacheConfiguration cacheCfg;

/** Deploy filter bytes. */
@GridToStringExclude
private byte[] deployFltrBytes;

/** Cache start ID. */
private IgniteUuid startId;
private IgnitePredicate<ClusterNode> nodeFilter;

/**
* @param cacheCfg Cache configuration.
* @param deployFltrBytes Deployment filter bytes.
* @param nodeFilter Node filter.
*/
public DynamicCacheDescriptor(CacheConfiguration cacheCfg, byte[] deployFltrBytes, IgniteUuid startId) {
public DynamicCacheDescriptor(CacheConfiguration cacheCfg, IgnitePredicate<ClusterNode> nodeFilter, IgniteUuid startId) {
this.cacheCfg = cacheCfg;
this.deployFltrBytes = deployFltrBytes;
this.nodeFilter = nodeFilter;
this.startId = startId;
}

/**
* @return Start ID.
*/
public IgniteUuid startId() {
return startId;
}

/**
* @return Cache configuration.
*/
Expand All @@ -57,10 +65,10 @@ public CacheConfiguration cacheConfiguration() {
}

/**
* @return Start ID.
* @return Node filter.
*/
public IgniteUuid startId() {
return startId;
public IgnitePredicate<ClusterNode> nodeFilter() {
return nodeFilter;
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.events.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
Expand Down Expand Up @@ -114,6 +115,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana

final ClusterNode n = e.eventNode();

GridDhtPartitionExchangeId exchId = null;
GridDhtPartitionsExchangeFuture<K, V> exchFut = null;

if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) {
assert !loc.id().equals(n.id());

Expand All @@ -129,12 +133,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
"Node joined with smaller-than-local " +
"order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';

GridDhtPartitionExchangeId exchId = exchangeId(n.id(),
exchId = exchangeId(n.id(),
new AffinityTopologyVersion(e.topologyVersion(), minorTopVer = 0),
e.type());

GridDhtPartitionsExchangeFuture<K, V> exchFut = exchangeFuture(exchId, e);
exchFut = exchangeFuture(exchId, e, null);
}
else {
DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;

if (customEvt.data() instanceof DynamicCacheDescriptor) {
DynamicCacheDescriptor desc = (DynamicCacheDescriptor)customEvt.data();

// Check if this event should trigger partition exchange.
if (cctx.cache().dynamicCacheRegistered(desc)) {
exchId = exchangeId(n.id(),
new AffinityTopologyVersion(e.topologyVersion(), ++minorTopVer),
e.type());

exchFut = exchangeFuture(exchId, e, desc);
}
}
}

if (exchId != null) {
// Start exchange process.
pendingExchangeFuts.add(exchFut);

Expand All @@ -161,9 +183,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
});
}
else {
// TODO.
}
}
finally {
leaveBusy();
Expand Down Expand Up @@ -225,7 +244,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana

assert discoEvt.topologyVersion() == startTopVer.topologyVersion();

GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt);
GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt, null);

new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();

Expand Down Expand Up @@ -398,16 +417,6 @@ void onDiscoveryEvent(UUID nodeId, GridDhtPartitionsExchangeFuture<K, V> fut) {
}
}

/**
* Callback to start exchange for dynamically started cache.
*
* @param cacheDesc Cache descriptor.
*/
public void onCacheDeployed(DynamicCacheDescriptor cacheDesc) {
// TODO IGNITE-45 move to exchange future.
cctx.kernalContext().cache().onCacheStartFinished(cacheDesc);
}

/**
* @return {@code True} if topology has changed.
*/
Expand Down Expand Up @@ -579,11 +588,11 @@ private GridDhtPartitionExchangeId exchangeId(UUID nodeId, AffinityTopologyVersi
* @return Exchange future.
*/
GridDhtPartitionsExchangeFuture<K, V> exchangeFuture(GridDhtPartitionExchangeId exchId,
@Nullable DiscoveryEvent discoEvt) {
@Nullable DiscoveryEvent discoEvt, @Nullable DynamicCacheDescriptor startDesc) {
GridDhtPartitionsExchangeFuture<K, V> fut;

GridDhtPartitionsExchangeFuture<K, V> old = exchFuts.addx(
fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId));
fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId, startDesc));

if (old != null)
fut = old;
Expand All @@ -606,6 +615,11 @@ public void onExchangeDone(GridDhtPartitionsExchangeFuture<K, V> exchFut) {
fut.cleanUp();
}
}

DynamicCacheDescriptor desc = exchFut.dynamicCacheDescriptor();

if (desc != null)
cctx.cache().onCacheStartFinished(desc);
}

/**
Expand Down Expand Up @@ -654,7 +668,7 @@ private void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullM
refreshPartitions();
}
else
exchangeFuture(msg.exchangeId(), null).onReceive(node.id(), msg);
exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
}
finally {
leaveBusy();
Expand Down Expand Up @@ -692,7 +706,7 @@ private void processSinglePartitionUpdate(ClusterNode node, GridDhtPartitionsSin
scheduleResendPartitions();
}
else
exchangeFuture(msg.exchangeId(), null).onReceive(node.id(), msg);
exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
}
finally {
leaveBusy();
Expand Down

0 comments on commit 30d96ad

Please sign in to comment.