Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] async leadership WIP #1044

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,146 +42,170 @@ import NIO // Future
/// If a new member is selected as leader, a ``Cluster/Event`` carrying ``Cluster/LeadershipChange`` will be emitted.
/// Other actors may subscribe to `ClusterSystem.cluster.events` in order to receive and react to such changes,
/// e.g. if an actor should only perform its duties if it is residing on the current leader node.
public protocol LeaderElection {
internal protocol LeaderElection {
/// Select a member to become a leader out of the existing `Membership`.
///
/// Decisions about electing/selecting a leader may be performed asynchronously.
mutating func runElection(context: LeaderElectionContext, membership: Cluster.Membership) -> LeaderElectionResult
mutating func runElection(membership: Cluster.Membership) async throws -> Cluster.LeadershipChange?
}

public struct LeaderElectionContext {
public var log: Logger
public let loop: EventLoop

internal init<M>(_ ownerContext: _ActorContext<M>) {
self.log = ownerContext.log
self.loop = ownerContext.system._eventLoopGroup.next()
}

internal init(log: Logger, eventLoop: EventLoop) {
internal init(log: Logger) {
self.log = log
self.loop = eventLoop
}
}

/// Result of running a `LeaderElection`, which may be performed asynchronously (or not).
///
/// Synchronous leader elections are usually implemented by predictably ordering the nodes, e.g. ordering them by address
/// and picking the "lowest", which is a variant of "ranking" leader election. Asynchronous elections may involve having
/// to reach out to the other members and them performing a "vote" about who shall become the leader. As this involves
/// actor coordination, the result of such election is going to be provided asynchronously.
///
/// A change in leadership will result in a `Cluster.LeadershipChange` event being emitted in the system's cluster event stream.
public struct LeaderElectionResult: _AsyncResult {
public typealias Value = Cluster.LeadershipChange?
let future: EventLoopFuture<Cluster.LeadershipChange?>

init(_ future: EventLoopFuture<Cluster.LeadershipChange?>) {
self.future = future
}

public func _onComplete(_ callback: @escaping (Result<Cluster.LeadershipChange?, Error>) -> Void) {
self.future.whenComplete(callback)
}

public func withTimeout(after timeout: Duration) -> LeaderElectionResult {
LeaderElectionResult(self.future.withTimeout(after: timeout))
}
}
///// Result of running a `LeaderElection`, which may be performed asynchronously (or not).
/////
///// Synchronous leader elections are usually implemented by predictably ordering the nodes, e.g. ordering them by address
///// and picking the "lowest", which is a variant of "ranking" leader election. Asynchronous elections may involve having
///// to reach out to the other members and them performing a "vote" about who shall become the leader. As this involves
///// actor coordination, the result of such election is going to be provided asynchronously.
/////
///// A change in leadership will result in a `Cluster.LeadershipChange` event being emitted in the system's cluster event stream.
//public struct LeaderElectionResult: _AsyncResult {
// public typealias Value = Cluster.LeadershipChange?
// let future: EventLoopFuture<Cluster.LeadershipChange?>
//
// init(_ future: EventLoopFuture<Cluster.LeadershipChange?>) {
// self.future = future
// }
//
// public func _onComplete(_ callback: @escaping (Result<Cluster.LeadershipChange?, Error>) -> Void) {
// self.future.whenComplete(callback)
// }
//
// public func withTimeout(after timeout: Duration) -> LeaderElectionResult {
// LeaderElectionResult(self.future.withTimeout(after: timeout))
// }
//}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Leadership
// MARK: ClusterLeadership

/// Leadership encapsulates various `LeaderElection` strategies.
///
/// - SeeAlso: `LeaderElection`
public struct Leadership {}

extension Leadership {
final class Shell {
static let naming: _ActorNaming = "leadership"
public struct ClusterLeadership {}

private var membership: Cluster.Membership // FIXME: we need to ensure the membership is always up to date -- we need the initial snapshot or a diff from a zero state etc.
private var election: LeaderElection

init(_ election: LeaderElection) {
self.election = election
extension ClusterLeadership {

/// A leadership election strategy which determines the leader by observing the membership of the cluster,
/// and electing the member with the "lowest" address identity.
///
/// This election scheme does not require any cooperation between nodes, and all nodes determine the same leader given the same membership.
actor Shell {
typealias ActorSystem = ClusterSystem

let actorSystem: ActorSystem

// @ActorID.Metadata(\.path)
// var path: ActorPath

var strategy: LeaderElection
var log: Logger!
var clusterEventsTask: Task<Void, Never>?

var membership: Cluster.Membership = .empty

init(strategy: LeaderElection, actorSystem: ClusterSystem) async {
self.strategy = strategy
self.actorSystem = actorSystem

var log = actorSystem.log
log[metadataKey: "actor/path"] = "$leaderElection"
self.log = log
// self.path = try! ._user.appending("leaderElector")

// Prepare initial membership;
// Ensure we have our own node as joining right away, even if the events don't have it emitted yet
self.membership = .empty
}

var behavior: _Behavior<Cluster.Event> {
.setup { context in
context.log.trace("Configured with \(self.election)")
context.system.cluster.events.subscribe(context.myself)

// FIXME: we have to add "own node" since we're not getting the .snapshot... so we have to manually act as if..
_ = self.membership.applyMembershipChange(Cluster.MembershipChange(node: context.system.cluster.uniqueNode, previousStatus: nil, toStatus: .joining))
return self.runElection(context)
_ = self.membership.applyMembershipChange(Cluster.MembershipChange(node: actorSystem.cluster.uniqueNode, previousStatus: nil, toStatus: .joining))

self.clusterEventsTask = Task {
await listenToClusterEvents()
}
}

private var ready: _Behavior<Cluster.Event> {
.receive { context, event in

deinit {
self.clusterEventsTask?.cancel()
self.clusterEventsTask = nil
}

func listenToClusterEvents() async {
let clusterEvents = self.actorSystem.cluster.events

for try await event in clusterEvents {
switch event {
case .snapshot(let membership):
self.membership = membership
return .same


await self.runElection()

case .membershipChange(let change):
guard self.membership.applyMembershipChange(change) != nil else {
return .same // nothing changed, no need to select anew
continue // out of then whenLocal
}

return self.runElection(context)

await self.runElection()
case .reachabilityChange(let change):
_ = self.membership.applyReachabilityChange(change)

return self.runElection(context)

await self.runElection()
case .leadershipChange:
return .same // we are the source of such events!

continue // we are the source of such events!
case ._PLEASE_DO_NOT_EXHAUSTIVELY_MATCH_THIS_ENUM_NEW_CASES_MIGHT_BE_ADDED_IN_THE_FUTURE:
context.log.error("Received Cluster.Event [\(event)]. This should not happen, please file an issue.")
return .same
fatalError("Received impossible event: \(event)")
}
}
}

func runElection(_ context: _ActorContext<Cluster.Event>) -> _Behavior<Cluster.Event> {
var electionContext = LeaderElectionContext(context)
electionContext.log[metadataKey: "leadership/election"] = "\(String(reflecting: type(of: self.election)))"
let electionResult = self.election.runElection(context: electionContext, membership: self.membership)

// TODO: if/when we'd have some election scheme that is async, e.g. "vote" then this timeout should NOT be infinite and should be handled properly
return context.awaitResult(of: electionResult, timeout: .effectivelyInfinite) {
switch $0 {
case .success(.some(let leadershipChange)):
guard let changed = try self.membership.applyLeadershipChange(to: leadershipChange.newLeader) else {
context.log.trace("The leadership change that was decided on by \(self.election) results in no change from current leadership state.")
return self.ready
}
context.system.cluster.ref.tell(.requestMembershipChange(.leadershipChange(changed)))
return self.ready

case .success(.none):
// no change decided upon
return self.ready

case .failure(let err):
context.log.warning("Failed to select leader... Error: \(err)")
return self.ready

func runElection() async {
do {
let change = try await self.strategy.runElection(membership: self.membership)

guard let leadershipChange = change else {
// no leadership change was made
return
}

self.requestLeadershipChange(to: leadershipChange.newLeader)

} catch {
self.log.warning("Failed to select leader", metadata: [
"strategy": "\(self.strategy)",
"membership": "\(self.membership)",
"error": "\(error)",
])
return
}
}

func requestLeadershipChange(to newLeader: Cluster.Member?) {
guard let changed = try? self.membership.applyLeadershipChange(to: newLeader) else {
self.log.trace("The leadership change that was decided on by \(self.strategy) results in no change from current leadership state.")
return
}

self.actorSystem.cluster.ref.tell(.requestMembershipChange(.leadershipChange(changed)))
}
}

}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: LowestAddressReachableMember election strategy

extension Leadership {
extension ClusterLeadership {
/// Simple strategy which does not require any additional coordination from members to select a leader.
///
/// All `MemberStatus.joining`, `MemberStatus.up` _reachable_ members are sorted by their addresses,
Expand Down Expand Up @@ -224,45 +248,49 @@ extension Leadership {
/// potentially be unsafe given less than `minimumNrOfMembers` nodes.
///
public struct LowestReachableMember: LeaderElection {

var log: Logger

// TODO: In situations which need strong guarantees, this leadership election scheme does NOT provide strong enough
// guarantees, and you should consider using another scheme or consensus based modes.
let minimumNumberOfMembersToDecide: Int
let loseLeadershipIfBelowMinNrOfMembers: Bool

/// - param minimumNrOfMembers: minimum number of REACHABLE members when a leader can be elected.
public init(minimumNrOfMembers: Int, loseLeadershipIfBelowMinNrOfMembers: Bool = false) {
self.log = Logger(label: "\(Self.self)")
self.minimumNumberOfMembersToDecide = minimumNrOfMembers
self.loseLeadershipIfBelowMinNrOfMembers = loseLeadershipIfBelowMinNrOfMembers
}

public mutating func runElection(context: LeaderElectionContext, membership: Cluster.Membership) -> LeaderElectionResult {
public mutating func runElection(membership: Cluster.Membership) async throws -> Cluster.LeadershipChange? {
var membership = membership
let membersToSelectAmong = membership.members(atMost: .up, reachability: .reachable)

let enoughMembers = membersToSelectAmong.count >= self.minimumNumberOfMembersToDecide
if enoughMembers {
return self.selectByLowestAddress(context: context, membership: &membership, membersToSelectAmong: membersToSelectAmong)
return self.selectByLowestAddress(membership: &membership, membersToSelectAmong: membersToSelectAmong)
} else {
context.log.info("Not enough members [\(membersToSelectAmong.count)/\(self.minimumNumberOfMembersToDecide)] to run election, members: \(membersToSelectAmong)")
self.log.info("Not enough members [\(membersToSelectAmong.count)/\(self.minimumNumberOfMembersToDecide)] to run election, members: \(membersToSelectAmong)")
if self.loseLeadershipIfBelowMinNrOfMembers {
return self.notEnoughMembers(context: context, membership: &membership, membersToSelectAmong: membersToSelectAmong)
return self.notEnoughMembers(membership: &membership, membersToSelectAmong: membersToSelectAmong)
} else {
return self.belowMinMembersTryKeepStableLeader(context: context, membership: &membership)
return self.belowMinMembersTryKeepStableLeader(membership: &membership)
}
}
}

internal mutating func notEnoughMembers(context: LeaderElectionContext, membership: inout Cluster.Membership, membersToSelectAmong: [Cluster.Member]) -> LeaderElectionResult {
internal mutating func notEnoughMembers(membership: inout Cluster.Membership, membersToSelectAmong: [Cluster.Member]) -> Cluster.LeadershipChange? {
// not enough members to make a decision yet
context.log.trace("Not enough members to select leader from, minimum nr of members [\(membersToSelectAmong.count)/\(self.minimumNumberOfMembersToDecide)]")
self.log.trace("Not enough members to select leader from, minimum nr of members [\(membersToSelectAmong.count)/\(self.minimumNumberOfMembersToDecide)]")

if let currentLeader = membership.leader {
// Clear current leader and trigger `Cluster.LeadershipChange`
let change = try! membership.applyLeadershipChange(to: nil) // try!-safe, because changing leader to nil is safe
context.log.trace("Removing leader [\(currentLeader)]")
return .init(context.loop.next().makeSucceededFuture(change))
self.log.trace("Removing leader [\(currentLeader)]")
return change
} else {
return .init(context.loop.next().makeSucceededFuture(nil))
return nil
}
}

Expand All @@ -273,25 +301,25 @@ extension Leadership {
/// - it still is reachable and part of the membership
///
/// Other nodes MAY NOT be elected, as we are below the minimum members threshold, we can only keep an existing leader, but not elect new ones.
internal mutating func belowMinMembersTryKeepStableLeader(context: LeaderElectionContext, membership: inout Cluster.Membership) -> LeaderElectionResult {
internal mutating func belowMinMembersTryKeepStableLeader(membership: inout Cluster.Membership) -> Cluster.LeadershipChange? {
guard let currentLeader = membership.leader else {
// there was no leader previously, and now we are below `minimumNumberOfMembersToDecide` thus cannot select a new one
return .init(context.loop.next().makeSucceededFuture(nil)) // no change
return nil // no change
}

guard currentLeader.status <= .up else {
// the leader is not up anymore, and we have to remove it (cannot keep trusting it)
let change = try! membership.applyLeadershipChange(to: nil) // try!-safe, because changing leader to nil is safe
context.log.trace("Removing leader [\(currentLeader)], not enough members to elect new leader.")
return .init(context.loop.next().makeSucceededFuture(change))
self.log.trace("Removing leader [\(currentLeader)], not enough members to elect new leader.")
return change
}

// the leader is still up, regardless of reachability, we still trust it;
// as we do not have enough members to do another election, we stick to the node we know.
return .init(context.loop.next().makeSucceededFuture(nil))
return nil
}

internal mutating func selectByLowestAddress(context: LeaderElectionContext, membership: inout Cluster.Membership, membersToSelectAmong: [Cluster.Member]) -> LeaderElectionResult {
internal mutating func selectByLowestAddress(membership: inout Cluster.Membership, membersToSelectAmong: [Cluster.Member]) -> Cluster.LeadershipChange? {
let oldLeader = membership.leader

// select the leader, by lowest address
Expand All @@ -300,15 +328,15 @@ extension Leadership {
.first

if let change = try! membership.applyLeadershipChange(to: leader) { // try! safe, as we KNOW this member is part of membership
context.log.debug(
self.log.debug(
"Selected new leader: [\(oldLeader, orElse: "nil") -> \(leader, orElse: "nil")]",
metadata: [
"membership": "\(membership)",
]
)
return .init(context.loop.next().makeSucceededFuture(change))
return change
} else {
return .init(context.loop.next().makeSucceededFuture(nil)) // no change, e.g. the new/old leader are the same
return nil // no change, e.g. the new/old leader are the same
}
}
}
Expand All @@ -327,12 +355,12 @@ extension ClusterSystemSettings {

private let underlying: _LeadershipSelectionSettings

func make(_: ClusterSystemSettings) -> LeaderElection? {
internal func make(_: ClusterSystemSettings) -> LeaderElection? {
switch self.underlying {
case .none:
return nil
case .lowestReachable(let nr):
return Leadership.LowestReachableMember(minimumNrOfMembers: nr)
return ClusterLeadership.LowestReachableMember(minimumNrOfMembers: nr)
}
}

Expand Down
Loading