Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.raft.IndexWithTerm;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
Expand Down Expand Up @@ -140,6 +142,9 @@ public class MetaStorageManagerImpl implements MetaStorageManager {

private final RaftGroupOptionsConfigurer raftGroupOptionsConfigurer;

/** Gets completed when raft service is started. */
private final CompletableFuture<Void> raftNodeStarted = new CompletableFuture<>();

/**
* The constructor.
*
Expand Down Expand Up @@ -294,13 +299,16 @@ private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<Stri
? startFollowerNode(metaStorageNodes, disruptorConfig)
: startLearnerNode(metaStorageNodes, disruptorConfig);

return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(
thisNodeName,
raftService,
busyLock,
clusterTime,
() -> clusterService.topologyService().localMember().id())
);
raftNodeStarted.complete(null);

return raftServiceFuture
.thenApply(raftService -> new MetaStorageServiceImpl(
thisNodeName,
raftService,
busyLock,
clusterTime,
() -> clusterService.topologyService().localMember().id())
);
} catch (NodeStoppingException e) {
return failedFuture(e);
}
Expand All @@ -325,7 +333,7 @@ private CompletableFuture<? extends RaftGroupService> startFollowerNode(
followerListener = new MetaStorageListener(storage, clusterTime);

CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
raftNodeId(localPeer),
configuration,
followerListener,
RaftGroupEventsListener.noopLsnr,
Expand Down Expand Up @@ -370,7 +378,7 @@ private CompletableFuture<? extends RaftGroupService> startLearnerNode(
learnerListener = new MetaStorageListener(storage, clusterTime);

return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
raftNodeId(localPeer),
configuration,
learnerListener,
RaftGroupEventsListener.noopLsnr,
Expand All @@ -379,6 +387,10 @@ private CompletableFuture<? extends RaftGroupService> startLearnerNode(
);
}

private static RaftNodeId raftNodeId(Peer localPeer) {
return new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer);
}

/**
* Sets the Meta Storage configuration.
*
Expand Down Expand Up @@ -837,6 +849,29 @@ public CompletableFuture<Long> recoveryFinishedFuture() {
return recoveryFinishedFuture;
}

/**
* Returns a future that will be completed with information about index and term of the Metastorage Raft group.
*
* <p>This method is special in the following regard: it can be called before the component gets started. The returned
* future will be completed after the component start.
*/
public CompletableFuture<IndexWithTerm> raftNodeIndex() {
return raftNodeStarted.thenApply(unused -> inBusyLock(busyLock, () -> {
RaftNodeId nodeId = raftNodeId(new Peer(clusterService.nodeName()));

IndexWithTerm indexWithTerm;
try {
indexWithTerm = raftMgr.raftNodeIndex(nodeId);
} catch (NodeStoppingException e) {
throw new CompletionException(e);
}

assert indexWithTerm != null : "Attempt to get index and term when Raft node is not started yet or already stopped)";

return indexWithTerm;
}));
}

@TestOnly
public CompletableFuture<MetaStorageServiceImpl> metaStorageService() {
return metaStorageSvcFut;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.raft;

/**
* Raft index with the corresponding term.
*/
public class IndexWithTerm {
private final long index;
private final long term;

/** Constructor. */
public IndexWithTerm(long index, long term) {
this.index = index;
this.term = term;
}

/** Returns the index. */
public long index() {
return index;
}

/** Returns the term corresponding to the index. */
public long term() {
return term;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,12 @@ <T extends RaftGroupService> CompletableFuture<T> startRaftGroupService(
*/
void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptionsConfigurer raftGroupOptionsConfigurer)
throws NodeStoppingException;

/**
* Returns information about index and term of the given node, or {@code null} if the group is not started.
*
* @param nodeId ID of the Raft node.
* @throws NodeStoppingException If the node is already being stopped.
*/
@Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId) throws NodeStoppingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,19 @@ public void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptionsConfigure
}
}

@Override
public @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}

try {
return raftServer.raftNodeIndex(nodeId);
} finally {
busyLock.leaveBusy();
}
}

private <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNodeInternal(
RaftNodeId nodeId,
PeersAndLearners configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.util.Set;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.raft.IndexWithTerm;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/**
Expand Down Expand Up @@ -108,6 +110,13 @@ boolean startRaftNode(
*/
void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptions groupOptions);

/**
* Returns information about index and term of the given node, or {@code null} if the group is not started.
*
* @param nodeId ID of the Raft node.
*/
@Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId);

/**
* Returns local nodes running the given Raft group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.raft.IndexWithTerm;
import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl.ReadIndexEvent;
import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.option.NodeOptions;
Expand Down Expand Up @@ -577,6 +579,19 @@ public void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptions groupOpt
}
}

@Override
public @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId) {
RaftGroupService service = nodes.get(nodeId);

if (service == null) {
return null;
}

LogId logId = service.getRaftNode().lastLogIndexAndTerm();

return new IndexWithTerm(logId.getIndex(), logId.getTerm());
}

/**
* Performs a {@code resetPeers} operation on raft node.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.ignite.raft.jraft.core.NodeMetrics;
import org.apache.ignite.raft.jraft.core.Replicator;
import org.apache.ignite.raft.jraft.core.State;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.LogId;import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.Task;
import org.apache.ignite.raft.jraft.entity.UserLog;
Expand Down Expand Up @@ -328,4 +328,10 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
* Returns the value of last replicated log index. Corresponding log entry might not yet be written to the log storage (no flush).
*/
long lastLogIndex();

/**
* Returns the value of last replicated log index (with its term). Corresponding log entry might not yet be written to the log storage
* (no flush).
*/
LogId lastLogIndexAndTerm();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2787,6 +2787,17 @@ public long lastLogIndex() {
}
}

@Override
public LogId lastLogIndexAndTerm() {
this.readLock.lock();
try {
return logManager.getLastLogId(false).copy();
}
finally {
this.readLock.unlock();
}
}

@OnlyForTest
ConfigurationEntry getConf() {
this.readLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,14 +551,6 @@ public class IgniteImpl implements Ignite {
failureProcessor
);

systemDisasterRecoveryManager = new SystemDisasterRecoveryManagerImpl(
name,
clusterSvc.topologyService(),
clusterSvc.messagingService(),
vaultMgr,
restarter
);

clock = new HybridClockImpl();

clockWaiter = new ClockWaiter(name, clock);
Expand Down Expand Up @@ -701,6 +693,15 @@ public class IgniteImpl implements Ignite {

metaStorageMgr.configure(clusterConfigRegistry.getConfiguration(MetaStorageExtensionConfiguration.KEY).metaStorage());

systemDisasterRecoveryManager = new SystemDisasterRecoveryManagerImpl(
name,
clusterSvc.topologyService(),
clusterSvc.messagingService(),
vaultMgr,
restarter,
metaStorageMgr::raftNodeIndex
);

SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry
.getConfiguration(SchemaSynchronizationExtensionConfiguration.KEY).schemaSync();

Expand Down Expand Up @@ -1228,11 +1229,11 @@ CompletableFuture<Void> startAsync() {
failureProcessor,
clusterStateStorage,
clusterIdService,
systemDisasterRecoveryManager,
criticalWorkerRegistry,
nettyBootstrapFactory,
nettyWorkersRegistrar,
clusterSvc,
systemDisasterRecoveryManager,
restComponent,
partitionsLogStorageFactory,
msLogStorageFactory,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.disaster.system.message;

import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;

/**
* Message for retrieving index+term of the Metastorage Raft group on the recipient node.
*/
@Transferable(SystemDisasterRecoveryMessageGroup.METASTORAGE_INDEX_TERM_REQUEST)
public interface MetastorageIndexTermRequestMessage extends NetworkMessage {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.disaster.system.message;

import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;

/**
* Response to {@link MetastorageIndexTermRequestMessage}.
*/
@Transferable(SystemDisasterRecoveryMessageGroup.METASTORAGE_INDEX_TERM_RESPONSE)
public interface MetastorageIndexTermResponseMessage extends NetworkMessage {
/**
* Returns Raft index of the Metastorage group on the sender of this message.
*/
long raftIndex();

/**
* Returns term corresponding to the Raft index of the Metastorage group on the sender of this message.
*/
long raftTerm();
}
Loading