Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-9913 #6942

Closed
wants to merge 67 commits into from
Closed

Conversation

anton-vinogradov
Copy link
Contributor

Signed-off-by: Anton Vinogradov av@apache.org

Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Comment on lines +106 to +114
GridAffinityAssignmentCache aff = grp.affinity();

Set<Integer> failedPrimaries = aff.primaryPartitions(fut.exchangeId().eventNode().id(), aff.lastVersion());
Set<Integer> locBackups = aff.backupPartitions(fut.sharedContext().localNodeId(), aff.lastVersion());

for (int part : failedPrimaries) {
if (locBackups.contains(part))
return true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code does not look correct. A node may OWN a partition, but it will not be reported in aff.backupPartitions because the method returns current assignment, not the local partitions state. GridDhtPartitionTopology should be read for this information. A corresponding test need to be added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying answer from issue.
Do you mean we have to serve also moving and renting partitions?

So, Set failedPrimaries = aff.primaryPartitions(fut.exchangeId().eventNode().id(), aff.lastVersion()); is a correct calculation, but
Set locBackups = aff.backupPartitions(fut.sharedContext().localNodeId(), aff.lastVersion()); should be replaced with dht.localPartitions() usage?

if (compatibilityNode || (crd && fut.localJoinExchange())) {
GridDiscoveryManager disco = fut.sharedContext().discovery();

if (protocolVer > 2 && fut.exchangeId().isLeft() && fut.isEventNodeInBaseline() &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This protocol works only if cluster is in-memory and baseline-autoadjust is enabled.
How it works when persistence is enabled and with/without baseline autoadjust is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What protocol do you mean?
protocolVer == 3 was introduced by me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I mean the protocol introduced by yourself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works when you have baseline ... fut.isEventNodeInBaseline()
Cant understand the question :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disco.baselineChanged(fut.sharedContext().exchange().readyAffinityVersion(), fut.initialVersion())
This code will return true only if baseline has changed by NODE_LEFT event. Baseline is changed automatically in discovery cache only if it's in-memory cluster and auto-adjust is enabled. In other cases baseline is changed manually (if auto-adjust disabled) or by separate event (ChangeGlobalState) right after topology event if auto-adjust is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly the case covered by this fix :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you run this PME only if the baseline is changed: disco.baselineChanged(fut.sharedContext().exchange().readyAffinityVersion(), fut.initialVersion())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad :)

public boolean baselineChanged(AffinityTopologyVersion topVer1, AffinityTopologyVersion topVer2) {
...
return Objects.equals(baseline1, baseline2);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue solved?

Comment on lines 1031 to 1035
public boolean isEventNodeInBaseline() {
BaselineTopology top = cctx.discovery().discoCache().state().baselineTopology();

return top != null && top.consistentIds().contains(firstDiscoEvt.eventNode().consistentId());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is a race here. discovery().discoCache() will return a current snapshot, which is updated in discovery thread, while the method is executed in exchange thread. Returned disco cache may have topology version different from the exchange future topology version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, what happens when exchanges are merged? Two failed nodes -> we need to check both? Failed and joined -> We should not allow fast-path exchange.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Correct fix is to use versioned disco?
    cctx.discovery().discoCache(topVer)... correct?

  2. We never merge left with anything
    if (baselineNodeLeft) baselineNodeLeft = true; merge = false;
    so, even cascade failure will be processed one-by-one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Seems you can use firstEvtDiscoCache

/**
* @throws IgniteCheckedException If failed.
*/
private void waitRecovery() throws IgniteCheckedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in this method is very much similar to the waitPartitionRelease(). Can we avoid the duplication?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, code was explicitly duplicated to simplify PoC review.
Will compact before the final review request.
Same issue with finishLocalTxs/recoverLocalTxsByNode

@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
CacheGroupHolder cache = getOrCreateGroupHolder(fut.initialVersion(), desc);

cache.aff.reinitializeWithoutOfflineNodes(fut.initialVersion());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct affinity initialization.
You don't take into account that cluster may have MOVING partitions. In this case, a primary node can be chosen from MOVING backup and affinity is not recalculated after PME end. You can see correct affinity recalculation at org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager#onServerLeftWithExchangeMergeProtocol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I'm using assignment (not idealAssignment), which took moving into account (correct?) this should be the correct initialization?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no guarantee that the next backup will have a partition in OWNING status.
Case 1:
Suppose that you have a partition with 2 backups. Primary is OWNER other nodes are MOVING:
OMM
The second backup completed rebalancing and now have OWNING partition state:
OMO
Now the primary node is left and you have:
MO in affinity distribution. MOVING partition becomes primary which is incorrect behavior.
Case 2:
You lost your last OWNING partition in affinity. In this case, detectLostPartitions should be triggered.

You also nullify WaitRebalanceInfo. You shouldn't do this because in this case if you have currently running rebalancing late affinity assignment will not be triggered after rebalancing completion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Let's clarify, aff.assignment contains only owning partitions (correct?)
    I'm just shrinking previous assignment, without recalculation.
    I see no MOVING issues here, am I missed something?

  2. Rebalance will be restarted in case of moving partitions.
    See no problem here :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Affinity assignment contains OWNING and MOVING partitions.
  2. Rebalance is started but late affinity assignment because of nullified WaitRebalanceInfo - not.
  3. Detect lost partitions should be triggered after PME because if you don't have OWNING partitions you should mark others as LOST due to loss policy.

/**
* @throws IgniteCheckedException If failed.
*/
private void waitRecovery() throws IgniteCheckedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no tests in PR :(.
The following 2 cases need to be tested explicitly (to make sure that your optimization works correctly):
Case 1:

  1. Create a transaction that contains a primary partition that belongs to the left node.
  2. Make sure that the transaction is prepared on primary and backups and then block the finish request to backup.
  3. Make sure that new transactions can be started and committed on nodes that don't contain affected partitions
  4. Make sure that new transactions are blocked on nodes that contain affected partitions.
  5. Unblock finish request messages.
  6. Make sure that transactions waiting for local recovery are committed and PME on corresponding nodes finished.
  7. Make sure that new transactions are unblocked and committed on nodes where PME is finished.

Case 2:

  1. Create a transaction that contains a primary partition that belongs to the left node.
  2. Make sure that the transaction is prepared on primary and backups and then block the finish request to backup.
  3. Kill node, make sure that transaction recovery is started and then block recovery messages.
  4. Make sure that PME on node left is finished on nodes that don't contain affected partitions.
  5. Make sure that new transactions can be started and committed on nodes that don't contain affected partitions
  6. Make sure that new transactions are blocked on nodes that contain affected partitions.
  7. Unblock recovery messages.
  8. Make sure that transactions are recovered and committed and PME on corresponding nodes finished.
  9. Make sure that new transactions are unblocked and committed on nodes where PME is finished.

Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Comment on lines 1808 to 1819
public boolean baselineChanged(AffinityTopologyVersion topVer1, AffinityTopologyVersion topVer2) {
assert !topVer1.equals(topVer2);

DiscoCache disco1 = discoCache(topVer1);
DiscoCache disco2 = discoCache(topVer2);

BaselineTopology baseline1 = disco1 != null ? disco1.state().baselineTopology() : null;
BaselineTopology baseline2 = disco2 != null ? disco2.state().baselineTopology() : null;

return !Objects.equals(baseline1, baseline2);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we only care about baseline change comparing to previous version. Better to add a flag to DiscoCache indicating what baseline is changed comparing to previous version and get rid of internal collections comparison.
See org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#createDiscoCache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's possible to just use firstEvtDiscoCache.state().localBaselineAutoAdjustment()?
Updated the PR.

if (compatibilityNode || (crd && fut.localJoinExchange())) {
GridDiscoveryManager disco = fut.sharedContext().discovery();

if (protocolVer > 2 && fut.exchangeId().isLeft() && fut.isFirstEventNodeInBaseline() &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protocol v3 should be disabled if previous assignment is not ideal. Otherwise it means rebalancing is not finished and some nodes can see different local partition states at the moment of assignment calculation.

GridAffinityAssignmentCache aff = grp.affinity();

Set<Integer> failedPrimaries = aff.primaryPartitions(fut.exchangeId().eventNode().id(), aff.lastVersion());
Set<Integer> loc = grp.topology().localPartitionMap().keySet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should only take into account owning partitions here (avoid renting partitions, moving are not possible because previous assignment is ideal).

@@ -1357,7 +1370,9 @@ private ExchangeType onServerNodeEvent(boolean crd) throws IgniteCheckedExceptio

exchCtx.events().warnNoAffinityNodes(cctx);

centralizedAff = cctx.affinity().onCentralizedAffinityChange(this, crd);
centralizedAff = exchCtx.baselineNodeLeft() ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

centralizedAff is a part of old protocol. It's better not to mix old and new protocols, some refactoring is needed.

@@ -2225,7 +2351,7 @@ private String exchangeTimingsLogMessage(String header, List<String> timings) {

boolean locNodeNotCrd = crd == null || !crd.isLocal();

if (locNodeNotCrd && (serverNodeDiscoveryEvent() || localJoinExchange()))
if ((locNodeNotCrd && (serverNodeDiscoveryEvent() || localJoinExchange())) || exchCtx.baselineNodeLeft())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure partition loss policy works well with new protocol. I suggest removing nodes one by one under load until some subset of partitions will have no owners more.

AffinityTopologyVersion topVer = sharedContext().exchange().readyAffinityVersion();

// Failed node's primary partitions or just all local backups in case of possible exchange merge.
Set<Integer> parts = nodeId != null ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended as an optimization ?
This is not really needed, because here all possible updates are received and all remaining gaps could be safely removed.

Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
Signed-off-by: Anton Vinogradov <av@apache.org>
@anton-vinogradov
Copy link
Contributor Author

continued at #7069

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants