-
Notifications
You must be signed in to change notification settings - Fork 504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support raft node control #1137
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1137 +/- ##
============================================
- Coverage 65.71% 62.72% -3.00%
- Complexity 5748 5765 +17
============================================
Files 360 371 +11
Lines 29595 31273 +1678
Branches 4181 4425 +244
============================================
+ Hits 19449 19616 +167
- Misses 8170 9664 +1494
- Partials 1976 1993 +17
Continue to review full report at Codecov.
|
601a696
to
71064b5
Compare
f98773f
to
f056760
Compare
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java
Outdated
Show resolved
Hide resolved
"leader"); | ||
} | ||
RaftBackendStoreProvider provider = (RaftBackendStoreProvider) | ||
g.storeProvider(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a method this.storeProvider(g)
|
||
@GET | ||
@Timed | ||
@Path("leader") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get-leader
Change-Id: I90e2bc97d3aaa1f78208d07fb71dbfcdef991c93
Change-Id: I4f1ce11c7178b6e24ca33b2bfb29e3dfe45ba407
hugegraph-api/src/main/java/com/baidu/hugegraph/api/raft/RaftAPI.java
Outdated
Show resolved
Hide resolved
...raph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java
Outdated
Show resolved
Hide resolved
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java
Outdated
Show resolved
Hide resolved
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java
Outdated
Show resolved
Hide resolved
Status status = this.node.transferLeadershipTo(peerId); | ||
if (!status.isOk()) { | ||
throw new BackendException("Failed to transafer leader to '%s', " + | ||
"raft error : %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improve
future.waitFinished(); | ||
} catch (RuntimeException e) { | ||
throw e; | ||
} catch (Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to e
} catch (RuntimeException e) { | ||
throw e; | ||
} catch (Throwable t) { | ||
throw new BackendException(t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improve message
} catch (RuntimeException e) { | ||
throw e; | ||
} catch (Throwable t) { | ||
throw new BackendException(t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
LOG.warn("Waiting raft group '{}' election cost {}s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Change-Id: I650b3888d6f19dbfdab30a0f1ed84d7a698f31bb
@@ -127,7 +127,7 @@ | |||
public Number queryNumber(Query query); | |||
|
|||
public String name(); | |||
public BackendStoreProvider storeProvider(); | |||
public RaftNodeManager raftNodeManager(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to other line
@@ -188,8 +190,11 @@ public String name() { | |||
} | |||
|
|||
@Override | |||
public BackendStoreProvider storeProvider() { | |||
return this.storeProvider; | |||
public RaftNodeManager raftNodeManager() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} catch (Throwable t) { | ||
throw new BackendException(t); | ||
} catch (Throwable e) { | ||
throw new BackendException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some error message
Change-Id: I91ab77b4d78a3ad3539018dc2bff67e43278d600
@Override | ||
public String toString() { | ||
return "StoreCommand{type=" + type.name() + ", " + | ||
"action=" + action.name() + "}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use string format if just for debug, or else use string builder with capacity
Change-Id: Iaa2ea4071e053a522b8de5dabed2f6e8664fef79
} catch (InterruptedException e) { | ||
throw new BackendException( | ||
"Waiting for raft group '%s' election is interrupted", | ||
e, group, "election"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused arg "election"
if (timeout > 0 && consumedTime >= timeout) { | ||
throw new BackendException( | ||
"Waiting for raft group '{}' election timeout({}ms)", | ||
group, "", consumedTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused arg ""
@Override | ||
public Map<String, String> getLeader() { | ||
PeerId leaderId = this.raftNode.leaderId(); | ||
E.checkState(leaderId != null, "There is no leader for raft group %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap line before the message
Status status = this.raftNode.node().transferLeadershipTo(peerId); | ||
if (!status.isOk()) { | ||
throw new BackendException( | ||
"Failed to transafer leader to '%s', raft error : %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove space from "error :"
@Override | ||
public void transferLeaderTo(String endpoint) { | ||
PeerId peerId = PeerId.parsePeer(endpoint); | ||
Status status = this.raftNode.node().transferLeadershipTo(peerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should pass future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need
|
||
@Override | ||
public String toString() { | ||
return String.format("StoreCommand{type = %s, action = %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove space
Change-Id: Ifda87eb1388e2f969bcf928a3e81a3ae52b6b8dc
@@ -128,10 +128,11 @@ private Node initRaftNode() throws IOException { | |||
|
|||
private void submitCommand(StoreCommand command, StoreClosure closure) { | |||
// Wait leader elected | |||
this.waitLeaderElected(RaftSharedContext.NO_TIMEOUT); | |||
LeaderInfo leaderInfo = this.waitLeaderElected( | |||
RaftSharedContext.NO_TIMEOUT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align with this
} | ||
|
||
public void leaderTerm(long term) { | ||
this.leaderTerm.set(term); | ||
public void onLeaderInfoChange(PeerId peerId, boolean isLeader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to leaderId
private static final LeaderInfo NO_LEADER = new LeaderInfo(null, false); | ||
|
||
private final PeerId leaderId; | ||
private final boolean isLeader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to selfIsLeader
Change-Id: I4621b13eef2870059ed4dd44a59ae2180d17ab98
shift 3 | ||
;; | ||
# add-peer | ||
--add-peer|-r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-a
function print_usage() { | ||
echo " usage: raft-tools.sh [options]" | ||
echo " options: " | ||
echo " -g,--get-leader \${graph} get the leader endpoint for graph" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep consistency,add “can be used on leader or follower node”
if (!status.isOk()) { | ||
throw new BackendException( | ||
"Failed to transafer leader to '%s', raft error: %s", | ||
peerId, status.getErrorMsg()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use “endpoint” for exception output
Change-Id: I128274d0d543c8510a06f2be42b28570b3a0873d
@@ -17,7 +17,7 @@ LOG_PATH=${HOME_PATH}/logs | |||
function print_usage() { | |||
echo " usage: raft-tools.sh [options]" | |||
echo " options: " | |||
echo " -g,--get-leader \${graph} get the leader endpoint for graph" | |||
echo " -g,--get-leader \${graph} get the leader endpoint for graph, can be used on leader or follower node" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add list-peer
Change-Id: I7b18711df4423c8255cf91bf747c80a573b89c25
@@ -71,6 +71,8 @@ private void registerRpcRequestProcessors() { | |||
RpcServer rpcServer = this.context.rpcServer(); | |||
rpcServer.registerProcessor(new StoreCommandRequestProcessor( | |||
this.context)); | |||
rpcServer.registerProcessor(new SetLeaderRequestProcessor(this.context)); | |||
rpcServer.registerProcessor(new ListPeersRequestProcessor(this.context)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge two processors
Change-Id: Ia34c3f8d40bc86dff144d51c80feaf50b16d7231
extends RpcRequestProcessor<ListPeersRequest> { | ||
|
||
private static final Logger LOG = Log.logger(ListPeersRequestProcessor.class); | ||
private static final Logger LOG = Log.logger(ListPeersProcessor.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
Status status = new Status(RaftError.UNKNOWN, | ||
"fowared request failed"); | ||
closure.failure(status, new BackendException( | ||
"Current node isn't leader, leader is " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align
LOG.debug("StoreCommandResponse status error"); | ||
Status status = new Status(RaftError.UNKNOWN, | ||
"fowared request failed"); | ||
closure.failure(status, new BackendException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define a var for BackendException
} else { | ||
Status status = new Status(RaftError.UNKNOWN, | ||
"fowared request failed"); | ||
future.failure(status, new BackendException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define a var for BackendException
extends RpcRequestProcessor<SetLeaderRequest> { | ||
|
||
private static final Logger LOG = Log.logger(SetLeaderRequestProcessor.class); | ||
private static final Logger LOG = Log.logger(SetLeaderProcessor.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
extends RpcRequestProcessor<StoreCommandRequest> { | ||
|
||
private static final Logger LOG = Log.logger(StoreCommandRequestProcessor.class); | ||
private static final Logger LOG = Log.logger( | ||
StoreCommandProcessor.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Change-Id: Ib2d4dcf0683b6816edbd8fa4da106ba1afceb277
e = new BackendException( | ||
"Current node isn't leader, leader is [%s], " + | ||
"failed to forward request to leader: %s", | ||
leaderId, resp.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer align
BackendException e = new BackendException(
"Current node...
HugeGraph g = graph(manager, graph); | ||
RaftNodeManager raftNodeManager = g.raftNodeManager(); | ||
if (raftNodeManager == null) { | ||
throw new HugeException("Only work on raft mode can list peers"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allowed list-peers operation when working on raft mode
private <T extends Message> RaftClosure<T> forwardToLeader(Message request) { | ||
PeerId leaderId = this.raftNode.leaderId(); | ||
return this.rpcForwarder.forwardToLeader(leaderId, request); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to line 150
public void setLeader(String endpoint) { | ||
PeerId newLeaderId = PeerId.parsePeer(endpoint); | ||
Node node = this.raftNode.node(); | ||
// No need to re-elect if already is new leader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improve "if expected endpoint has already been raft leader"
PeerId leaderId = this.raftNode.leaderId(); | ||
E.checkState(leaderId != null, | ||
"There is no leader for raft group %s", this.group); | ||
return ImmutableMap.of(this.group, leaderId.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if getLeader() returns group, seems other APIs are also needed
StoreCommandRequest request = builder.build(); | ||
|
||
RpcResponseClosure<StoreCommandResponse> responseClosure; | ||
responseClosure = new RpcResponseClosure<StoreCommandResponse>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace <StoreCommandResponse> with <>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will compile failed
E.checkState(!leaderId.equals(this.nodeId), | ||
"Invalid state: current node is the leader, there is " + | ||
"no need to forward the request"); | ||
LOG.debug("The node {} forward request to leader {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add '{}'
|
||
RaftClosure<T> future = new RaftClosure<>(); | ||
RpcResponseClosure<T> responseClosure; | ||
responseClosure = new RpcResponseClosure<T>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems one line is ok, and remove <T>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will compile failed
"fowared request failed"); | ||
BackendException e; | ||
e = new BackendException( | ||
"Current node isn't leader, leader is [%s], " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Change-Id: I3784211036b622afcb2a410d2255773f51d40eb4
@PathParam("graph") String graph, | ||
@QueryParam("endpoint") String endpoint) { | ||
public Map<String, String> setLeader(@Context GraphManager manager, | ||
@PathParam("graph") String graph, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align
@PathParam("graph") String graph, | ||
@QueryParam("endpoint") String endpoint) { | ||
public Map<String, String> transferLeader(@Context GraphManager manager, | ||
@PathParam("graph") String graph, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add group parameter
Change-Id: Ie7736a8867b0b0f4e47747debb5ca1026190c29e
* Support raft node control * expose RaftNodeManager instead of StoreProvider * manage leader info by ourselves * add list peers API * define rpcForwarder to handle forward request * Let raft api return with group * add group paramter in RaftAPI Change-Id: I90e2bc97d3aaa1f78208d07fb71dbfcdef991c93
No description provided.