Skip to content

Commit

Permalink
IGNITE-8405 Update partition owners during exchange in 1 operation. - F…
Browse files Browse the repository at this point in the history
…ixes #3929.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
  • Loading branch information
Jokser authored and agoncharuk committed May 3, 2018
1 parent 8dd9c5d commit 314156f
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 79 deletions.
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
Expand All @@ -50,15 +51,14 @@
import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;

/**
* Partition topology for node which does not have any local partitions.
Expand Down Expand Up @@ -1138,39 +1138,62 @@ private void removeNode(UUID nodeId) {
}

/** {@inheritDoc} */
@Override public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq) {
Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new HashSet<UUID>();
@Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) {
Map<UUID, Set<Integer>> result = new HashMap<>();

lock.writeLock().lock();

try {
for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
GridDhtPartitionMap partMap = e.getValue();
UUID remoteNodeId = e.getKey();
// Process remote partitions.
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
int part = entry.getKey();
Set<UUID> newOwners = entry.getValue();

if (!partMap.containsKey(p))
continue;
for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) {
UUID remoteNodeId = remotes.getKey();
GridDhtPartitionMap partMap = remotes.getValue();

if (partMap.get(p) == OWNING && !owners.contains(remoteNodeId)) {
partMap.put(p, MOVING);
GridDhtPartitionState state = partMap.get(part);

if (state == null || state != OWNING)
continue;

if (!haveHistory)
result.add(remoteNodeId);
if (!newOwners.contains(remoteNodeId)) {
partMap.put(part, MOVING);

partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());
partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());

U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " +
"[nodeId=" + remoteNodeId + ", groupId=" + grpId +
", partId=" + p + ", haveHistory=" + haveHistory + "]");
result.computeIfAbsent(remoteNodeId, n -> new HashSet<>());
result.get(remoteNodeId).add(part);
}
}
}

part2node.put(p, owners);
for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) {
UUID nodeId = entry.getKey();
Set<Integer> partsToRebalance = entry.getValue();

if (updateSeq)
this.updateSeq.incrementAndGet();
}
finally {
if (!partsToRebalance.isEmpty()) {
Set<Integer> historical = partsToRebalance.stream()
.filter(haveHistory::contains)
.collect(Collectors.toSet());

// Filter out partitions having WAL history.
partsToRebalance.removeAll(historical);

U.warn(log, "Partitions have been scheduled for rebalancing due to outdated update counter "
+ "[grpId=" + grpId
+ ", nodeId=" + nodeId
+ ", partsFull=" + S.compact(partsToRebalance)
+ ", partsHistorical=" + S.compact(historical) + "]");
}
}

for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet())
part2node.put(entry.getKey(), entry.getValue());

updateSeq.incrementAndGet();
} finally {
lock.writeLock().unlock();
}

Expand Down
Expand Up @@ -389,16 +389,15 @@ public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
public boolean rebalanceFinished(AffinityTopologyVersion topVer);

/**
* Make nodes from provided set owners for a given partition.
* State of all current owners that aren't contained in the set will be reset to MOVING.
* Calculates nodes and partitions which have non-actual state and must be rebalanced.
* State of all current owners that aren't contained in the given {@code ownersByUpdCounters} will be reset to MOVING.
*
* @param p Partition ID.
* @param owners Set of new owners.
* @param haveHistory {@code True} if there is WAL history to rebalance given partition.
* @param updateSeq If should increment sequence when updated.
* @return Set of node IDs that should reload partitions.
* @param ownersByUpdCounters Map (partition, set of node IDs that have most actual state about partition
* (update counter is maximal) and should hold OWNING state for such partition).
* @param haveHistory Set of partitions which have WAL history to rebalance.
* @return Map (nodeId, set of partitions that should be rebalanced <b>fully</b> by this node).
*/
public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq);
public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory);

/**
* Callback on exchange done.
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -2102,59 +2104,82 @@ else if (plc != PartitionLossPolicy.IGNORE) {
}

/** {@inheritDoc} */
@Override public Set<UUID> setOwners(int p, Set<UUID> ownersByUpdCounters, boolean haveHistory, boolean updateSeq) {
Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new HashSet<UUID>();
@Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) {
Map<UUID, Set<Integer>> result = new HashMap<>();

ctx.database().checkpointReadLock();

try {
lock.writeLock().lock();

try {
GridDhtLocalPartition locPart = locParts.get(p);
// First process local partitions.
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
int part = entry.getKey();
Set<UUID> newOwners = entry.getValue();

GridDhtLocalPartition locPart = localPartition(part);

if (locPart != null) {
if (locPart.state() == OWNING && !ownersByUpdCounters.contains(ctx.localNodeId())) {
rebalancePartition(p, haveHistory);
if (locPart == null || locPart.state() != OWNING)
continue;

if (!haveHistory)
result.add(ctx.localNodeId());
if (!newOwners.contains(ctx.localNodeId())) {
rebalancePartition(part, haveHistory.contains(part));

U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " +
"[nodeId=" + ctx.localNodeId() + ", grp=" + grp.cacheOrGroupName() +
", partId=" + p + ", haveHistory=" + haveHistory + "]");
result.computeIfAbsent(ctx.localNodeId(), n -> new HashSet<>());
result.get(ctx.localNodeId()).add(part);
}
}

for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
UUID remoteNodeId = e.getKey();
GridDhtPartitionMap partMap = e.getValue();
// Then process remote partitions.
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
int part = entry.getKey();
Set<UUID> newOwners = entry.getValue();

if (!partMap.containsKey(p))
continue;
for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) {
UUID remoteNodeId = remotes.getKey();
GridDhtPartitionMap partMap = remotes.getValue();

GridDhtPartitionState state = partMap.get(part);

if (partMap.get(p) == OWNING && !ownersByUpdCounters.contains(remoteNodeId)) {
partMap.put(p, MOVING);
if (state == null || state != OWNING)
continue;

if (!haveHistory)
result.add(remoteNodeId);
if (!newOwners.contains(remoteNodeId)) {
partMap.put(part, MOVING);

partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());
partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());

if (partMap.nodeId().equals(ctx.localNodeId()))
this.updateSeq.setIfGreater(partMap.updateSequence());
if (partMap.nodeId().equals(ctx.localNodeId()))
updateSeq.setIfGreater(partMap.updateSequence());

U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " +
"[nodeId=" + remoteNodeId + ", grp=" + grp.cacheOrGroupName() +
", partId=" + p + ", haveHistory=" + haveHistory + "]");
result.computeIfAbsent(remoteNodeId, n -> new HashSet<>());
result.get(remoteNodeId).add(part);
}
}
}

if (updateSeq) {
long updSeq = this.updateSeq.incrementAndGet();
for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) {
UUID nodeId = entry.getKey();
Set<Integer> rebalancedParts = entry.getValue();

if (!rebalancedParts.isEmpty()) {
Set<Integer> historical = rebalancedParts.stream()
.filter(haveHistory::contains)
.collect(Collectors.toSet());

node2part = new GridDhtPartitionFullMap(node2part, updSeq);
// Filter out partitions having WAL history.
rebalancedParts.removeAll(historical);

U.warn(log, "Partitions have been scheduled for rebalancing due to outdated update counter "
+ "[grp=" + grp.cacheOrGroupName()
+ ", nodeId=" + nodeId
+ ", partsFull=" + S.compact(rebalancedParts)
+ ", partsHistorical=" + S.compact(historical) + "]");
}
}

node2part = new GridDhtPartitionFullMap(node2part, updateSeq.incrementAndGet());
} finally {
lock.writeLock().unlock();
}
Expand Down
Expand Up @@ -59,7 +59,6 @@
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
Expand All @@ -73,7 +72,6 @@
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext;
import org.apache.ignite.internal.processors.cache.StateChangeRequest;
Expand Down Expand Up @@ -2375,19 +2373,18 @@ else if (cntr == maxCntr.cnt)
}
}

for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) {
int p = e.getKey();
long maxCntr = e.getValue().cnt;

entryLeft--;
Map<Integer, Set<UUID>> ownersByUpdCounters = new HashMap<>(maxCntrs.size());
for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet())
ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);

if (entryLeft != 0 && maxCntr == 0)
continue;
Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(ownersByUpdCounters, haveHistory);

Set<UUID> nodesToReload = top.setOwners(p, e.getValue().nodes, haveHistory.contains(p), entryLeft == 0);
for (Map.Entry<UUID, Set<Integer>> e : partitionsToRebalance.entrySet()) {
UUID nodeId = e.getKey();
Set<Integer> parts = e.getValue();

for (UUID nodeId : nodesToReload)
partsToReload.put(nodeId, top.groupId(), p);
for (int part : parts)
partsToReload.put(nodeId, top.groupId(), part);
}
}

Expand Down

0 comments on commit 314156f

Please sign in to comment.