Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-20317 return metastorage invokes for zones changes in handlers, immediately recalculate data nodes when scale up/down is immediate. #2685

Merged
merged 4 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
Expand Down Expand Up @@ -150,17 +151,17 @@ public class DistributionZoneManager implements IgniteComponent {
private final LogicalTopologyEventListener topologyEventListener = new LogicalTopologyEventListener() {
@Override
public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) {
updateLogicalTopologyInMetaStorage(newTopology, false);
updateLogicalTopologyInMetaStorage(newTopology);
}

@Override
public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
updateLogicalTopologyInMetaStorage(newTopology, false);
updateLogicalTopologyInMetaStorage(newTopology);
}

@Override
public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
updateLogicalTopologyInMetaStorage(newTopology, true);
updateLogicalTopologyInMetaStorage(newTopology);
}
};

Expand Down Expand Up @@ -295,6 +296,14 @@ private CompletableFuture<Void> onUpdateScaleUpBusy(AlterZoneEventParameters par

long causalityToken = parameters.causalityToken();

if (newScaleUp == IMMEDIATE_TIMER_VALUE) {
return saveDataNodesToMetaStorageOnScaleUp(zoneId, causalityToken).thenRun(() -> {
// TODO: causalityOnUpdateScaleUp will be removed https://issues.apache.org/jira/browse/IGNITE-20604,
// catalog must be used instead
causalityDataNodesEngine.causalityOnUpdateScaleUp(causalityToken, zoneId, IMMEDIATE_TIMER_VALUE);
});
}

// It is safe to zonesTimers.get(zoneId) in term of NPE because meta storage notifications are one-threaded
// and this map will be initialized on a manager start or with catalog notification
ZoneState zoneState = zonesState.get(zoneId);
Expand Down Expand Up @@ -329,6 +338,14 @@ private CompletableFuture<Void> onUpdateScaleDownBusy(AlterZoneEventParameters p

long causalityToken = parameters.causalityToken();

if (newScaleDown == IMMEDIATE_TIMER_VALUE) {
return saveDataNodesToMetaStorageOnScaleDown(zoneId, causalityToken).thenRun(() -> {
// TODO: causalityOnUpdateScaleDown will be removed https://issues.apache.org/jira/browse/IGNITE-20604,
// catalog must be used instead
causalityDataNodesEngine.causalityOnUpdateScaleDown(causalityToken, zoneId, IMMEDIATE_TIMER_VALUE);
});
}

// It is safe to zonesTimers.get(zoneId) in term of NPE because meta storage notifications are one-threaded
// and this map will be initialized on a manager start or with catalog notification
ZoneState zoneState = zonesState.get(zoneId);
Expand Down Expand Up @@ -387,8 +404,9 @@ private CompletableFuture<Void> onUpdateFilter(AlterZoneEventParameters paramete
*
* @param zone Zone descriptor.
* @param causalityToken Causality token.
* @return Future reflecting the completion of creation or restoring a zone.
*/
private void createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long causalityToken) {
private CompletableFuture<Void> createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long causalityToken) {
int zoneId = zone.id();

VaultEntry topologyAugmentationMapFromVault = vaultMgr.get(zoneTopologyAugmentationVault(zoneId)).join();
Expand All @@ -403,7 +421,8 @@ private void createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long causa

Set<Node> dataNodes = logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());

initDataNodesAndTriggerKeysInMetaStorage(zoneId, causalityToken, dataNodes);
return initDataNodesAndTriggerKeysInMetaStorage(zoneId, causalityToken, dataNodes)
.thenRun(() -> causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, zone));
} else {
// Restart case, when topologyAugmentationMap has already been saved during a cluster work.
ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap = fromBytes(topologyAugmentationMapFromVault.value());
Expand All @@ -424,6 +443,8 @@ private void createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long causa
}

causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, zone);

return completedFuture(null);
}

/**
Expand Down Expand Up @@ -493,8 +514,9 @@ private void restoreTimers(
* @param zoneId Unique id of a zone
* @param revision Revision of an event that has triggered this method.
* @param dataNodes Data nodes.
* @return Future reflecting the completion of initialisation of zone's keys in meta storage.
*/
private void initDataNodesAndTriggerKeysInMetaStorage(
private CompletableFuture<Void> initDataNodesAndTriggerKeysInMetaStorage(
int zoneId,
long revision,
Set<Node> dataNodes
Expand All @@ -512,27 +534,32 @@ private void initDataNodesAndTriggerKeysInMetaStorage(

Iif iif = iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd, ops().yield(false));

metaStorageManager.invoke(iif).whenComplete((res, e) -> {
if (e != null) {
LOG.error(
"Failed to update zones' dataNodes value [zoneId = {}, dataNodes = {}, revision = {}]",
e,
zoneId,
dataNodes,
revision
);
} else if (res.getAsBoolean()) {
LOG.info("Update zones' dataNodes value [zoneId = {}, dataNodes = {}, revision = {}]",
zoneId, dataNodes, revision);
} else {
LOG.debug(
"Failed to update zones' dataNodes value [zoneId = {}, dataNodes = {}, revision = {}]",
zoneId,
dataNodes,
revision
);
}
});
return metaStorageManager.invoke(iif)
.thenApply(StatementResult::getAsBoolean)
.whenComplete((invokeResult, e) -> {
if (e != null) {
LOG.error(
"Failed to update zones' dataNodes value [zoneId = {}, dataNodes = {}, revision = {}]",
e,
zoneId,
dataNodes,
revision
);
} else if (invokeResult) {
LOG.info("Update zones' dataNodes value [zoneId = {}, dataNodes = {}, revision = {}]",
zoneId,
dataNodes,
revision
);
} else {
LOG.debug(
"Failed to update zones' dataNodes value [zoneId = {}, dataNodes = {}, revision = {}]",
zoneId,
dataNodes,
revision
);
}
}).thenCompose((ignored) -> completedFuture(null));
} finally {
busyLock.leaveBusy();
}
Expand All @@ -544,7 +571,7 @@ private void initDataNodesAndTriggerKeysInMetaStorage(
* @param zoneId Unique id of a zone
* @param revision Revision of an event that has triggered this method.
*/
private void removeTriggerKeysAndDataNodes(int zoneId, long revision) {
private CompletableFuture<Void> removeTriggerKeysAndDataNodes(int zoneId, long revision) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
}
Expand All @@ -556,20 +583,23 @@ private void removeTriggerKeysAndDataNodes(int zoneId, long revision) {

Iif iif = iif(triggerKeyCondition, removeKeysUpd, ops().yield(false));

metaStorageManager.invoke(iif).whenComplete((res, e) -> {
if (e != null) {
LOG.error(
"Failed to delete zone's dataNodes keys [zoneId = {}, revision = {}]",
e,
zoneId,
revision
);
} else if (res.getAsBoolean()) {
LOG.info("Delete zone's dataNodes keys [zoneId = {}, revision = {}]", zoneId, revision);
} else {
LOG.debug("Failed to delete zone's dataNodes keys [zoneId = {}, revision = {}]", zoneId, revision);
}
});
return metaStorageManager.invoke(iif)
.thenApply(StatementResult::getAsBoolean)
.whenComplete((invokeResult, e) -> {
if (e != null) {
LOG.error(
"Failed to delete zone's dataNodes keys [zoneId = {}, revision = {}]",
e,
zoneId,
revision
);
} else if (invokeResult) {
LOG.info("Delete zone's dataNodes keys [zoneId = {}, revision = {}]", zoneId, revision);
} else {
LOG.debug("Failed to delete zone's dataNodes keys [zoneId = {}, revision = {}]", zoneId, revision);
}
})
.thenCompose(ignored -> completedFuture(null));
} finally {
busyLock.leaveBusy();
}
Expand All @@ -580,10 +610,8 @@ private void removeTriggerKeysAndDataNodes(int zoneId, long revision) {
* in meta storage.
*
* @param newTopology Logical topology snapshot.
* @param topologyLeap Flag that indicates whether this updates was trigger by
* {@link LogicalTopologyEventListener#onTopologyLeap(LogicalTopologySnapshot)} or not.
*/
private void updateLogicalTopologyInMetaStorage(LogicalTopologySnapshot newTopology, boolean topologyLeap) {
private void updateLogicalTopologyInMetaStorage(LogicalTopologySnapshot newTopology) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
}
Expand All @@ -597,12 +625,7 @@ private void updateLogicalTopologyInMetaStorage(LogicalTopologySnapshot newTopol
// Very first start of the cluster, so we just initialize zonesLogicalTopologyVersionKey
updateCondition = notExists(zonesLogicalTopologyVersionKey());
} else {
if (topologyLeap) {
updateCondition = value(zonesLogicalTopologyVersionKey()).lt(longToBytes(newTopology.version()));
} else {
// This condition may be stronger, as far as we receive topology events one by one.
updateCondition = value(zonesLogicalTopologyVersionKey()).eq(longToBytes(newTopology.version() - 1));
}
updateCondition = value(zonesLogicalTopologyVersionKey()).lt(longToBytes(newTopology.version()));
}

Iif iff = iif(
Expand Down Expand Up @@ -1358,17 +1381,14 @@ private void registerCatalogEventListenersOnStartManagerBusy() {

CreateZoneEventParameters params = (CreateZoneEventParameters) parameters;

createOrRestoreZoneStateBusy(params.zoneDescriptor(), params.causalityToken());

return completedFuture(false);
return createOrRestoreZoneStateBusy(params.zoneDescriptor(), params.causalityToken())
.thenCompose((ignored) -> completedFuture(false));
}));

catalogManager.listen(ZONE_DROP, (parameters, exception) -> inBusyLock(busyLock, () -> {
assert exception == null : parameters;

onDropZoneBusy((DropZoneEventParameters) parameters);

return completedFuture(false);
return onDropZoneBusy((DropZoneEventParameters) parameters).thenCompose((ignored) -> completedFuture(false));
}));

catalogManager.listen(ZONE_ALTER, new ManagerCatalogAlterZoneEventListener());
Expand Down Expand Up @@ -1409,7 +1429,7 @@ protected CompletableFuture<Void> onFilterUpdate(AlterZoneEventParameters parame
}
}

private void onDropZoneBusy(DropZoneEventParameters parameters) {
private CompletableFuture<Void> onDropZoneBusy(DropZoneEventParameters parameters) {
int zoneId = parameters.zoneId();

long causalityToken = parameters.causalityToken();
Expand All @@ -1418,10 +1438,10 @@ private void onDropZoneBusy(DropZoneEventParameters parameters) {

zoneState.stopTimers();

removeTriggerKeysAndDataNodes(zoneId, causalityToken);
return removeTriggerKeysAndDataNodes(zoneId, causalityToken).thenRun(() -> {
causalityDataNodesEngine.onDelete(causalityToken, zoneId);

causalityDataNodesEngine.onDelete(causalityToken, zoneId);

zonesState.remove(zoneId);
zonesState.remove(zoneId);
});
}
}
Loading