Skip to content

Commit

Permalink
IGNITE-7514 Affinity assignment should be recalculated when primary n…
Browse files Browse the repository at this point in the history
…ode is not OWNER
  • Loading branch information
ilantukh authored and agoncharuk committed Feb 7, 2018
1 parent 7b37f64 commit faf50f1
Show file tree
Hide file tree
Showing 11 changed files with 424 additions and 86 deletions.
Expand Up @@ -21,7 +21,10 @@
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;


/** /**
* Custom event. * Custom event.
Expand Down Expand Up @@ -85,4 +88,35 @@ public void affinityTopologyVersion(AffinityTopologyVersion affTopVer) {
@Override public String toString() { @Override public String toString() {
return S.toString(DiscoveryCustomEvent.class, this, super.toString()); return S.toString(DiscoveryCustomEvent.class, this, super.toString());
} }

/**
* @param evt Discovery event.
* @return {@code True} if event is DiscoveryCustomEvent that requires centralized affinity assignment.
*/
public static boolean requiresCentralizedAffinityAssignment(DiscoveryEvent evt) {
if (!(evt instanceof DiscoveryCustomEvent))
return false;

return requiresCentralizedAffinityAssignment(((DiscoveryCustomEvent)evt).customMessage());
}

/**
* @param msg Discovery custom message.
* @return {@code True} if message belongs to event that requires centralized affinity assignment.
*/
public static boolean requiresCentralizedAffinityAssignment(@Nullable DiscoveryCustomMessage msg) {
if (msg == null)
return false;

if (msg instanceof ChangeGlobalStateMessage && ((ChangeGlobalStateMessage)msg).activate())
return true;

if (msg instanceof SnapshotDiscoveryMessage) {
SnapshotDiscoveryMessage snapMsg = (SnapshotDiscoveryMessage) msg;

return snapMsg.needExchange() && snapMsg.needAssignPartitions();
}

return false;
}
} }
Expand Up @@ -40,6 +40,7 @@
import org.apache.ignite.events.Event; import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
Expand All @@ -57,7 +58,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.GridPartitionStateMap;
Expand Down Expand Up @@ -161,13 +161,15 @@ void onDiscoveryEvent(int type,
ClusterNode node, ClusterNode node,
AffinityTopologyVersion topVer, AffinityTopologyVersion topVer,
DiscoveryDataClusterState state) { DiscoveryDataClusterState state) {
if (state.transition() || !state.active()) if ((state.transition() || !state.active()) &&
!DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg))
return; return;


if (type == EVT_NODE_JOINED && node.isLocal()) if (type == EVT_NODE_JOINED && node.isLocal())
lastAffVer = null; lastAffVer = null;


if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) { if ((!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) ||
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) {
synchronized (mux) { synchronized (mux) {
assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0; assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0;


Expand Down Expand Up @@ -1260,10 +1262,12 @@ public GridAffinityAssignmentCache affinity(Integer grpId) {
} }


/** /**
* Applies affinity diff from the received full message.
*
* @param fut Current exchange future. * @param fut Current exchange future.
* @param msg Finish exchange message. * @param msg Finish exchange message.
*/ */
public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut, public void applyAffinityFromFullMessage(final GridDhtPartitionsExchangeFuture fut,
final GridDhtPartitionsFullMessage msg) { final GridDhtPartitionsFullMessage msg) {
final Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();


Expand Down Expand Up @@ -1396,14 +1400,44 @@ public void onServerJoinWithExchangeMergeProtocol(GridDhtPartitionsExchangeFutur
* @return Computed difference with ideal affinity. * @return Computed difference with ideal affinity.
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
public Map<Integer, CacheGroupAffinityMessage> onServerLeftWithExchangeMergeProtocol( public Map<Integer, CacheGroupAffinityMessage> onServerLeftWithExchangeMergeProtocol(
final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException
{ {
final ExchangeDiscoveryEvents evts = fut.context().events(); final ExchangeDiscoveryEvents evts = fut.context().events();


assert fut.context().mergeExchanges(); assert fut.context().mergeExchanges();
assert evts.hasServerLeft(); assert evts.hasServerLeft();


return onReassignmentEnforced(fut);
}

/**
* Calculates affinity on coordinator for custom event types that require centralized assignment.
*
* @param fut Current exchange future.
* @return Computed difference with ideal affinity.
* @throws IgniteCheckedException If failed.
*/
public Map<Integer, CacheGroupAffinityMessage> onCustomEventWithEnforcedAffinityReassignment(
final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException
{
assert DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent());

return onReassignmentEnforced(fut);
}

/**
* Calculates new affinity assignment on coordinator and creates affinity diff messages for other nodes.
*
* @param fut Current exchange future.
* @return Computed difference with ideal affinity.
* @throws IgniteCheckedException If failed.
*/
private Map<Integer, CacheGroupAffinityMessage> onReassignmentEnforced(
final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException
{
final ExchangeDiscoveryEvents evts = fut.context().events();

forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
AffinityTopologyVersion topVer = evts.topologyVersion(); AffinityTopologyVersion topVer = evts.topologyVersion();
Expand All @@ -1418,7 +1452,7 @@ public Map<Integer, CacheGroupAffinityMessage> onServerLeftWithExchangeMergePro
} }
}); });


Map<Integer, Map<Integer, List<Long>>> diff = initAffinityOnNodeLeft0(evts.topologyVersion(), Map<Integer, Map<Integer, List<Long>>> diff = initAffinityBasedOnPartitionsAvailability(evts.topologyVersion(),
fut, fut,
NODE_TO_ORDER, NODE_TO_ORDER,
true); true);
Expand Down Expand Up @@ -1642,17 +1676,16 @@ private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion
} }


/** /**
* Called on exchange initiated by server node leave. * Called on exchange initiated by server node leave or custom event with centralized affinity assignment.
* *
* @param fut Exchange future. * @param fut Exchange future.
* @param crd Coordinator flag. * @param crd Coordinator flag.
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
* @return {@code True} if affinity should be assigned by coordinator. * @return {@code True} if affinity should be assigned by coordinator.
*/ */
public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { public boolean onCentralizedAffinityChange(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
ClusterNode leftNode = fut.firstEvent().eventNode(); assert (fut.events().hasServerLeft() && !fut.firstEvent().eventNode().isClient()) ||

DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent()) : fut.firstEvent();
assert !leftNode.isClient() : leftNode;


if (crd) { if (crd) {
// Need initialize CacheGroupHolders if this node become coordinator on this exchange. // Need initialize CacheGroupHolders if this node become coordinator on this exchange.
Expand Down Expand Up @@ -2066,7 +2099,7 @@ public IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> initAffinity
initFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { initFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> initFut) { @Override public void apply(IgniteInternalFuture<?> initFut) {
try { try {
resFut.onDone(initAffinityOnNodeLeft0(fut.initialVersion(), fut, NODE_TO_ID, false)); resFut.onDone(initAffinityBasedOnPartitionsAvailability(fut.initialVersion(), fut, NODE_TO_ID, false));
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
resFut.onDone(e); resFut.onDone(e);
Expand All @@ -2077,23 +2110,31 @@ public IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> initAffinity
return resFut; return resFut;
} }
else else
return new GridFinishedFuture<>(initAffinityOnNodeLeft0(fut.initialVersion(), fut, NODE_TO_ID, false)); return new GridFinishedFuture<>(initAffinityBasedOnPartitionsAvailability(fut.initialVersion(), fut, NODE_TO_ID, false));
} }


/** /**
* Initializes current affinity assignment based on partitions availability.
* Nodes that have most recent data will be considered affinity nodes.
*
* @param topVer Topology version. * @param topVer Topology version.
* @param fut Exchange future. * @param fut Exchange future.
* @param c Closure converting affinity diff. * @param c Closure converting affinity diff.
* @param initAff {@code True} if need initialize affinity. * @param initAff {@code True} if need initialize affinity.
* @return Affinity assignment. * @return Affinity assignment.
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
private <T> Map<Integer, Map<Integer, List<T>>> initAffinityOnNodeLeft0(final AffinityTopologyVersion topVer, private <T> Map<Integer, Map<Integer, List<T>>> initAffinityBasedOnPartitionsAvailability(final AffinityTopologyVersion topVer,
final GridDhtPartitionsExchangeFuture fut, final GridDhtPartitionsExchangeFuture fut,
final IgniteClosure<ClusterNode, T> c, final IgniteClosure<ClusterNode, T> c,
final boolean initAff) final boolean initAff)
throws IgniteCheckedException { throws IgniteCheckedException {
final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(fut.context().events().lastServerEventVersion()); final boolean enforcedCentralizedAssignment =
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent());

final WaitRebalanceInfo waitRebalanceInfo = enforcedCentralizedAssignment ?
new WaitRebalanceInfo(fut.exchangeId().topologyVersion()) :
new WaitRebalanceInfo(fut.context().events().lastServerEventVersion());


final Collection<ClusterNode> aliveNodes = fut.context().events().discoveryCache().serverNodes(); final Collection<ClusterNode> aliveNodes = fut.context().events().discoveryCache().serverNodes();


Expand All @@ -2103,13 +2144,14 @@ private <T> Map<Integer, Map<Integer, List<T>>> initAffinityOnNodeLeft0(final Af
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
CacheGroupHolder grpHolder = groupHolder(topVer, desc); CacheGroupHolder grpHolder = groupHolder(topVer, desc);


if (!grpHolder.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom())) if (!grpHolder.rebalanceEnabled ||
(fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()) && !enforcedCentralizedAssignment))
return; return;


AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion(); AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion();


assert affTopVer.topologyVersion() > 0 && !affTopVer.equals(topVer) : "Invalid affinity version " + assert (affTopVer.topologyVersion() > 0 && !affTopVer.equals(topVer)) || enforcedCentralizedAssignment :
"[last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']'; "Invalid affinity version [last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']';


List<List<ClusterNode>> curAssignment = grpHolder.affinity().assignments(affTopVer); List<List<ClusterNode>> curAssignment = grpHolder.affinity().assignments(affTopVer);
List<List<ClusterNode>> newAssignment = grpHolder.affinity().idealAssignment(); List<List<ClusterNode>> newAssignment = grpHolder.affinity().idealAssignment();
Expand Down Expand Up @@ -2141,6 +2183,12 @@ private <T> Map<Integer, Map<Integer, List<T>>> initAffinityOnNodeLeft0(final Af
", node=" + newPrimary + ", node=" + newPrimary +
", topVer=" + topVer + ']'; ", topVer=" + topVer + ']';


List<ClusterNode> owners = top.owners(p);

// It is essential that curPrimary node has partition in OWNING state.
if (!owners.isEmpty() && !owners.contains(curPrimary))
curPrimary = owners.get(0);

if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) { if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) {
if (aliveNodes.contains(curPrimary)) { if (aliveNodes.contains(curPrimary)) {
GridDhtPartitionState state = top.partitionState(newPrimary.id(), p); GridDhtPartitionState state = top.partitionState(newPrimary.id(), p);
Expand Down Expand Up @@ -2173,8 +2221,6 @@ private <T> Map<Integer, Map<Integer, List<T>>> initAffinityOnNodeLeft0(final Af
} }


if (newNodes0 == null) { if (newNodes0 == null) {
List<ClusterNode> owners = top.owners(p);

for (ClusterNode owner : owners) { for (ClusterNode owner : owners) {
if (aliveNodes.contains(owner)) { if (aliveNodes.contains(owner)) {
newNodes0 = latePrimaryAssignment(grpHolder.affinity(), newNodes0 = latePrimaryAssignment(grpHolder.affinity(),
Expand Down
Expand Up @@ -1186,6 +1186,7 @@ public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
/** /**
* @param msg Message. * @param msg Message.
* @param topVer Current topology version. * @param topVer Current topology version.
* @param curState Current cluster state.
* @return Exchange action. * @return Exchange action.
* @throws IgniteCheckedException If configuration validation failed. * @throws IgniteCheckedException If configuration validation failed.
*/ */
Expand Down
Expand Up @@ -965,7 +965,7 @@ public void scheduleResendPartitions() {
* For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send, * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send,
* for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send * for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send
*/ */
private void refreshPartitions() { public void refreshPartitions() {
// TODO https://issues.apache.org/jira/browse/IGNITE-6857 // TODO https://issues.apache.org/jira/browse/IGNITE-6857
if (cctx.snapshot().snapshotOperationInProgress()) { if (cctx.snapshot().snapshotOperationInProgress()) {
scheduleResendPartitions(); scheduleResendPartitions();
Expand Down
Expand Up @@ -256,9 +256,9 @@ private String mapString(GridDhtPartitionMap map) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, @Override public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
GridDhtPartitionsExchangeFuture exchFut) { GridDhtPartitionsExchangeFuture exchFut) {
// No-op. return false;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -382,6 +382,11 @@ else if (!node2part.nodeId().equals(loc.id())) {
fullMapString() + ']'); fullMapString() + ']');
} }


/** {@inheritDoc} */
@Override public void afterStateRestored(AffinityTopologyVersion topVer) {
// no-op
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
AffinityTopologyVersion topVer = exchFut.topologyVersion(); AffinityTopologyVersion topVer = exchFut.topologyVersion();
Expand Down
Expand Up @@ -213,9 +213,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
// TODO ignite-db // TODO ignite-db
throw new IgniteException(e); throw new IgniteException(e);
} }

// Todo log moving state
casState(state.get(), MOVING);
} }


/** /**
Expand Down
Expand Up @@ -117,11 +117,19 @@ public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
/** /**
* @param affVer Affinity version. * @param affVer Affinity version.
* @param exchFut Exchange future. * @param exchFut Exchange future.
* @return {@code True} if partitions must be refreshed.
* @throws IgniteInterruptedCheckedException If interrupted. * @throws IgniteInterruptedCheckedException If interrupted.
*/ */
public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut) public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut)
throws IgniteInterruptedCheckedException; throws IgniteInterruptedCheckedException;


/**
* Initializes local data structures after partitions are restored from persistence.
*
* @param topVer Topology version.
*/
public void afterStateRestored(AffinityTopologyVersion topVer);

/** /**
* Post-initializes this topology. * Post-initializes this topology.
* *
Expand Down

0 comments on commit faf50f1

Please sign in to comment.