From 0b2782c887af6c1d377e3fa9cd814217e50dc40f Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 29 Sep 2020 12:10:37 +0900 Subject: [PATCH 1/2] =cluster,handshake #724 immediately initialize membership, without "loop" through shell as this may cause a race condition between a node extenging a handshake to this node before the "loop through self adding the myself Member" has a chance to run; This manifested in tests by rejecting handshakes by "no local member" which is nonsense, there always is a local known member after all. --- .../ActorInstrumentsPackageDefinition.swift | 2 +- .../Cluster/ClusterShell+LeaderActions.swift | 2 +- .../Cluster/ClusterShell.swift | 51 +++++++------------ .../Cluster/ClusterShellState.swift | 28 ++++++---- .../Cluster/HandshakeStateMachine.swift | 2 +- .../Cluster/ClusterLeaderActionsTests.swift | 6 +-- .../RemotingHandshakeStateMachineTests.swift | 24 ++++----- scripts/docs/generate_api.sh | 14 ++--- 8 files changed, 62 insertions(+), 67 deletions(-) diff --git a/Instruments/GenActorInstruments/Sources/ActorInstrumentsPackageDefinition/ActorInstrumentsPackageDefinition.swift b/Instruments/GenActorInstruments/Sources/ActorInstrumentsPackageDefinition/ActorInstrumentsPackageDefinition.swift index 3041256d1..cf4dc97e5 100644 --- a/Instruments/GenActorInstruments/Sources/ActorInstrumentsPackageDefinition/ActorInstrumentsPackageDefinition.swift +++ b/Instruments/GenActorInstruments/Sources/ActorInstrumentsPackageDefinition/ActorInstrumentsPackageDefinition.swift @@ -19,7 +19,7 @@ import SwiftyInstrumentsPackageDefinition // package fileprivate let packageID = "com.apple.actors.ActorInstruments" -fileprivate let packageVersion: String = "0.5.0" // TODO: match with project version +fileprivate let packageVersion: String = "0.6.0" // TODO: match with project version fileprivate let packageTitle = "Actors" // schema diff --git a/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift b/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift index cb0584a81..c8054f059 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift @@ -22,7 +22,7 @@ import NIO extension ClusterShellState { /// If, and only if, the current node is a leader it performs a set of tasks, such as moving nodes to `.up` etc. func collectLeaderActions() -> [LeaderAction] { - guard self.membership.isLeader(self.localNode) else { + guard self.membership.isLeader(self.selfNode) else { return [] // since we are not the leader, we perform no tasks } diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index 5620051ac..aba25d815 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -437,7 +437,7 @@ extension ClusterShell { } ) - let state = ClusterShellState( + var state = ClusterShellState( settings: clusterSettings, channel: chan, events: self.clusterEvents, @@ -445,15 +445,14 @@ extension ClusterShell { log: context.log ) - // loop through "self" cluster shell, which in result causes notifying all subscribers about cluster membership change - var firstGossip = Cluster.MembershipGossip(ownerNode: state.localNode) - _ = firstGossip.membership.join(state.localNode) // change will be put into effect by receiving the "self gossip" - firstGossip.incrementOwnerVersion() - context.system.cluster.updateMembershipSnapshot(state.membership) - - gossiperControl.update(payload: firstGossip) // ???? - context.myself.tell(.gossipFromGossiper(firstGossip)) - // TODO: are we ok if we received another gossip first, not our own initial? should be just fine IMHO + // immediately join the owner node (us), as we always should be part of the membership + // this immediate join is important in case we immediately get a handshake from other nodes, + // and will need to reply to them with our `Member`. + if let change = state.latestGossip.membership.join(state.selfNode) { + // always update the snapshot before emitting events + context.system.cluster.updateMembershipSnapshot(state.membership) + self.clusterEvents.publish(.membershipChange(change)) + } return self.ready(state: state) } @@ -539,7 +538,7 @@ extension ClusterShell { /// Allows processing in one spot, all membership changes which we may have emitted in other places, due to joining, downing etc. func receiveChangeMembershipRequest(_ context: ActorContext, event: Cluster.Event) -> Behavior { - self.tracelog(context, .receive(from: state.localNode.node), message: event) + self.tracelog(context, .receive(from: state.selfNode.node), message: event) var state = state // 1) IMPORTANT: We MUST apply and act on the incoming event FIRST, before handling the other events. @@ -646,7 +645,7 @@ extension ClusterShell { guard change.status < .down else { return } - guard change.member.uniqueNode != state.localNode else { + guard change.member.uniqueNode != state.selfNode else { return } // TODO: make it cleaner? though we decided to go with manual peer management as the ClusterShell owns it, hm @@ -672,7 +671,7 @@ extension ClusterShell { internal func beginHandshake(_ context: ActorContext, _ state: ClusterShellState, with remoteNode: Node) -> Behavior { var state = state - guard remoteNode != state.localNode.node else { + guard remoteNode != state.selfNode.node else { state.log.debug("Ignoring attempt to handshake with myself; Could have been issued as confused attempt to handshake as induced by discovery via gossip?") return .same } @@ -761,28 +760,14 @@ extension ClusterShell { _ state: ClusterShellState, _ offer: Wire.HandshakeOffer ) -> Wire.HandshakeReject? { - guard let member = state.localMember else { - // no local member? this is bad - state.log.warning( - """ - Received handshake while no local Cluster.Member available, this may indicate that we were removed form the cluster. - Rejecting handshake - """) - return .init( - version: state.settings.protocolVersion, - targetNode: state.localNode, - originNode: offer.originNode, - reason: "Node cannot be part of cluster, no member available.", - whenHandshakeReplySent: nil - ) - } + let member = state.selfMember if member.status.isAtLeast(.leaving) { state.log.notice("Received handshake while already [\(member.status)]") return .init( version: state.settings.protocolVersion, - targetNode: state.localNode, + targetNode: state.selfNode, originNode: offer.originNode, reason: "Node already leaving cluster.", whenHandshakeReplySent: nil @@ -1129,7 +1114,7 @@ extension ClusterShell { } private func onRestInPeace(_ context: ActorContext, _ state: ClusterShellState, intendedNode: UniqueNode, fromNode: UniqueNode) -> Behavior { - let myselfNode = state.localNode + let myselfNode = state.selfNode guard myselfNode == myselfNode else { state.log.warning( @@ -1196,12 +1181,12 @@ extension ClusterShell { // FIXME: also close all associations (!!!) switch $0 { case .success: - context.log.info("Unbound server socket [\(addrDesc)], node: \(reflecting: state.localNode)") + context.log.info("Unbound server socket [\(addrDesc)], node: \(reflecting: state.selfNode)") self.serializationPool.shutdown() signalOnceUnbound.offerOnce(()) return .stop case .failure(let err): - context.log.warning("Failed while unbinding server socket [\(addrDesc)], node: \(reflecting: state.localNode). Error: \(err)") + context.log.warning("Failed while unbinding server socket [\(addrDesc)], node: \(reflecting: state.selfNode). Error: \(err)") self.serializationPool.shutdown() signalOnceUnbound.offerOnce(()) throw err @@ -1259,7 +1244,7 @@ extension ClusterShell { // whenever we down a node we must ensure to confirm it to swim, so it won't keep monitoring it forever needlessly self._swimRef?.tell(.local(SWIM.LocalMessage.confirmDead(memberToDown.uniqueNode))) - if memberToDown.uniqueNode == state.localNode { + if memberToDown.uniqueNode == state.selfNode { // ==== ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // Down(self node); ensuring SWIM knows about this and should likely initiate graceful shutdown context.log.warning( diff --git a/Sources/DistributedActors/Cluster/ClusterShellState.swift b/Sources/DistributedActors/Cluster/ClusterShellState.swift index 64b7eb8a6..c0bc65565 100644 --- a/Sources/DistributedActors/Cluster/ClusterShellState.swift +++ b/Sources/DistributedActors/Cluster/ClusterShellState.swift @@ -28,8 +28,9 @@ internal protocol ReadOnlyClusterState { var handshakeBackoff: BackoffStrategy { get } /// Unique address of the current node. - var localNode: UniqueNode { get } - var localMember: Cluster.Member? { get } // TODO: enforce that we always have the localMember + var selfNode: UniqueNode { get } + var selfMember: Cluster.Member { get } + var settings: ClusterSettings { get } } @@ -45,9 +46,18 @@ internal struct ClusterShellState: ReadOnlyClusterState { let channel: Channel - let localNode: UniqueNode - var localMember: Cluster.Member? { - self.membership.uniqueMember(self.localNode) + let selfNode: UniqueNode + var selfMember: Cluster.Member { + if let member = self.membership.uniqueMember(self.selfNode) { + return member + } else { + fatalError(""" + ClusterShellState.localMember was nil! This should be impossible by construction, because a node ALWAYS knows about itself. + Please report a bug on the distributed-actors issue tracker. Details: + Membership: \(self.membership) + Settings: \(self.settings) + """) + } } let eventLoopGroup: EventLoopGroup @@ -58,7 +68,7 @@ internal struct ClusterShellState: ReadOnlyClusterState { let allocator: ByteBufferAllocator - internal var _handshakes: [Node: HandshakeStateMachine.State] = [:] + var _handshakes: [Node: HandshakeStateMachine.State] = [:] let gossiperControl: GossiperControl @@ -109,7 +119,7 @@ internal struct ClusterShellState: ReadOnlyClusterState { self.allocator = settings.allocator self.eventLoopGroup = settings.eventLoopGroup ?? settings.makeDefaultEventLoopGroup() - self.localNode = settings.uniqueBindNode + self.selfNode = settings.uniqueBindNode self._latestGossip = Cluster.MembershipGossip(ownerNode: settings.uniqueBindNode) self.events = events @@ -145,7 +155,7 @@ extension ClusterShellState { let initiated = HandshakeStateMachine.InitiatedState( settings: self.settings, - localNode: self.localNode, + localNode: self.selfNode, connectTo: remoteNode ) let handshakeState = HandshakeStateMachine.State.initiated(initiated) @@ -452,7 +462,7 @@ extension ClusterShellState { return .init(applied: changeWasApplied) } - self.log.trace("Membership updated on [\(self.localNode)] by \(event): \(pretty: self.membership)") + self.log.trace("Membership updated on [\(self.selfNode)] by \(event): \(pretty: self.membership)") return .init(applied: changeWasApplied) } diff --git a/Sources/DistributedActors/Cluster/HandshakeStateMachine.swift b/Sources/DistributedActors/Cluster/HandshakeStateMachine.swift index ceb24874f..2ae19df87 100644 --- a/Sources/DistributedActors/Cluster/HandshakeStateMachine.swift +++ b/Sources/DistributedActors/Cluster/HandshakeStateMachine.swift @@ -147,7 +147,7 @@ internal struct HandshakeStateMachine { let offer: Wire.HandshakeOffer var boundAddress: UniqueNode { - self.state.localNode + self.state.selfNode } var protocolVersion: DistributedActors.Version { diff --git a/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsTests.swift b/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsTests.swift index 587510843..c93368a43 100644 --- a/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsTests.swift +++ b/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsTests.swift @@ -33,15 +33,15 @@ final class ClusterLeaderActionsTests: XCTestCase { var stateC: ClusterShellState! var nodeA: UniqueNode { - self.stateA.localNode + self.stateA.selfNode } var nodeB: UniqueNode { - self.stateB.localNode + self.stateB.selfNode } var nodeC: UniqueNode { - self.stateC.localNode + self.stateC.selfNode } override func setUp() { diff --git a/Tests/DistributedActorsTests/Cluster/RemotingHandshakeStateMachineTests.swift b/Tests/DistributedActorsTests/Cluster/RemotingHandshakeStateMachineTests.swift index 278024100..c9fd5bb38 100644 --- a/Tests/DistributedActorsTests/Cluster/RemotingHandshakeStateMachineTests.swift +++ b/Tests/DistributedActorsTests/Cluster/RemotingHandshakeStateMachineTests.swift @@ -29,14 +29,14 @@ final class RemoteHandshakeStateMachineTests: XCTestCase { func test_handshake_happyPath() throws { let serverKernel = ClusterShellState.makeTestMock(side: .server) - let serverAddress = serverKernel.localNode + let serverAddress = serverKernel.selfNode let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in settings.node.port = 2222 } // client - let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node) + let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node) let offer = clientInitiated.makeOffer() // server @@ -55,11 +55,11 @@ final class RemoteHandshakeStateMachineTests: XCTestCase { // then - serverCompleted.localNode.shouldEqual(serverKernel.localNode) - serverCompleted.remoteNode.shouldEqual(clientKernel.localNode) + serverCompleted.localNode.shouldEqual(serverKernel.selfNode) + serverCompleted.remoteNode.shouldEqual(clientKernel.selfNode) - clientCompleted.remoteNode.shouldEqual(serverKernel.localNode) - clientCompleted.localNode.shouldEqual(clientKernel.localNode) + clientCompleted.remoteNode.shouldEqual(serverKernel.selfNode) + clientCompleted.localNode.shouldEqual(clientKernel.selfNode) } // ==== ------------------------------------------------------------------------------------------------------------ @@ -67,14 +67,14 @@ final class RemoteHandshakeStateMachineTests: XCTestCase { func test_negotiate_server_shouldAcceptClient_newerPatch() throws { let serverKernel = ClusterShellState.makeTestMock(side: .server) - let serverAddress = serverKernel.localNode + let serverAddress = serverKernel.selfNode let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in settings.node.port = 2222 settings._protocolVersion.patch += 1 } - let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node) + let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node) let offer = clientInitiated.makeOffer() // server @@ -92,14 +92,14 @@ final class RemoteHandshakeStateMachineTests: XCTestCase { func test_negotiate_server_shouldRejectClient_newerMajor() throws { let serverKernel = ClusterShellState.makeTestMock(side: .server) - let serverAddress = serverKernel.localNode + let serverAddress = serverKernel.selfNode let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in settings.node.port = 2222 settings._protocolVersion.major += 1 } - let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node) + let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node) let offer = clientInitiated.makeOffer() // server @@ -123,14 +123,14 @@ final class RemoteHandshakeStateMachineTests: XCTestCase { func test_onTimeout_shouldReturnNewHandshakeOffersMultipleTimes() throws { let serverKernel = ClusterShellState.makeTestMock(side: .server) - let serverAddress = serverKernel.localNode + let serverAddress = serverKernel.selfNode let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in settings.node.port = 8228 } // client - var clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node) + var clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node) guard case .scheduleRetryHandshake = clientInitiated.onHandshakeTimeout() else { throw shouldNotHappen("Expected retry attempt after handshake timeout") diff --git a/scripts/docs/generate_api.sh b/scripts/docs/generate_api.sh index 561163319..b91002e97 100755 --- a/scripts/docs/generate_api.sh +++ b/scripts/docs/generate_api.sh @@ -56,13 +56,13 @@ if [[ "$(uname -s)" == "Linux" ]]; then cd "$source_kitten_source_path" && swift build -c release && cd "$root_path" fi # generate -# for module in "${modules[@]}"; do -## if [[ ! -f "$root_path/.build/sourcekitten/$module.json" ]]; then -# # always generate, otherwise we miss things when we're iterating on adding docs. -# echo "Generating $root_path/.build/sourcekitten/$module.json ..." -# "$source_kitten_path/sourcekitten" doc --spm-module "$module" > "$root_path/.build/sourcekitten/$module.json" -## fi -# done + for module in "${modules[@]}"; do +# if [[ ! -f "$root_path/.build/sourcekitten/$module.json" ]]; then + # always generate, otherwise we miss things when we're iterating on adding docs. + echo "Generating $root_path/.build/sourcekitten/$module.json ..." + "$source_kitten_path/sourcekitten" doc --spm-module "$module" > "$root_path/.build/sourcekitten/$module.json" +# fi + done fi # prep index From ebbf9274762e67293624b60c938973e7c407e020 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 29 Sep 2020 17:52:25 +0900 Subject: [PATCH 2/2] Update Sources/DistributedActors/Cluster/ClusterShellState.swift Co-authored-by: Yim Lee --- Sources/DistributedActors/Cluster/ClusterShellState.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/DistributedActors/Cluster/ClusterShellState.swift b/Sources/DistributedActors/Cluster/ClusterShellState.swift index c0bc65565..97f79fd08 100644 --- a/Sources/DistributedActors/Cluster/ClusterShellState.swift +++ b/Sources/DistributedActors/Cluster/ClusterShellState.swift @@ -52,7 +52,7 @@ internal struct ClusterShellState: ReadOnlyClusterState { return member } else { fatalError(""" - ClusterShellState.localMember was nil! This should be impossible by construction, because a node ALWAYS knows about itself. + ClusterShellState.selfMember was nil! This should be impossible by construction, because a node ALWAYS knows about itself. Please report a bug on the distributed-actors issue tracker. Details: Membership: \(self.membership) Settings: \(self.settings)