Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import SwiftyInstrumentsPackageDefinition

// package
fileprivate let packageID = "com.apple.actors.ActorInstruments"
fileprivate let packageVersion: String = "0.5.0" // TODO: match with project version
fileprivate let packageVersion: String = "0.6.0" // TODO: match with project version
fileprivate let packageTitle = "Actors"

// schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import NIO
extension ClusterShellState {
/// If, and only if, the current node is a leader it performs a set of tasks, such as moving nodes to `.up` etc.
func collectLeaderActions() -> [LeaderAction] {
guard self.membership.isLeader(self.localNode) else {
guard self.membership.isLeader(self.selfNode) else {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming consistency

return [] // since we are not the leader, we perform no tasks
}

Expand Down
51 changes: 18 additions & 33 deletions Sources/DistributedActors/Cluster/ClusterShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -437,23 +437,22 @@ extension ClusterShell {
}
)

let state = ClusterShellState(
var state = ClusterShellState(
settings: clusterSettings,
channel: chan,
events: self.clusterEvents,
gossiperControl: gossiperControl,
log: context.log
)

// loop through "self" cluster shell, which in result causes notifying all subscribers about cluster membership change
var firstGossip = Cluster.MembershipGossip(ownerNode: state.localNode)
_ = firstGossip.membership.join(state.localNode) // change will be put into effect by receiving the "self gossip"
firstGossip.incrementOwnerVersion()
context.system.cluster.updateMembershipSnapshot(state.membership)

gossiperControl.update(payload: firstGossip) // ????
context.myself.tell(.gossipFromGossiper(firstGossip))
// TODO: are we ok if we received another gossip first, not our own initial? should be just fine IMHO
// immediately join the owner node (us), as we always should be part of the membership
// this immediate join is important in case we immediately get a handshake from other nodes,
// and will need to reply to them with our `Member`.
if let change = state.latestGossip.membership.join(state.selfNode) {
// always update the snapshot before emitting events
context.system.cluster.updateMembershipSnapshot(state.membership)
self.clusterEvents.publish(.membershipChange(change))
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fix™

The sending to self to avoid having another spot where we have to write "updateMembershipSnapshot and publish event" was causing a window of opportunity for another message to be handled first -- and that message would then hit us when we had NO members at all in the membership, not even "us".

This would then cause the nonsensical handshake rejection because "there is no local member"

Yet there always is a local member! It's impossible to not know the local member, it is us.


return self.ready(state: state)
}
Expand Down Expand Up @@ -539,7 +538,7 @@ extension ClusterShell {

/// Allows processing in one spot, all membership changes which we may have emitted in other places, due to joining, downing etc.
func receiveChangeMembershipRequest(_ context: ActorContext<Message>, event: Cluster.Event) -> Behavior<Message> {
self.tracelog(context, .receive(from: state.localNode.node), message: event)
self.tracelog(context, .receive(from: state.selfNode.node), message: event)
var state = state

// 1) IMPORTANT: We MUST apply and act on the incoming event FIRST, before handling the other events.
Expand Down Expand Up @@ -646,7 +645,7 @@ extension ClusterShell {
guard change.status < .down else {
return
}
guard change.member.uniqueNode != state.localNode else {
guard change.member.uniqueNode != state.selfNode else {
return
}
// TODO: make it cleaner? though we decided to go with manual peer management as the ClusterShell owns it, hm
Expand All @@ -672,7 +671,7 @@ extension ClusterShell {
internal func beginHandshake(_ context: ActorContext<Message>, _ state: ClusterShellState, with remoteNode: Node) -> Behavior<Message> {
var state = state

guard remoteNode != state.localNode.node else {
guard remoteNode != state.selfNode.node else {
state.log.debug("Ignoring attempt to handshake with myself; Could have been issued as confused attempt to handshake as induced by discovery via gossip?")
return .same
}
Expand Down Expand Up @@ -761,28 +760,14 @@ extension ClusterShell {
_ state: ClusterShellState,
_ offer: Wire.HandshakeOffer
) -> Wire.HandshakeReject? {
guard let member = state.localMember else {
// no local member? this is bad
state.log.warning(
"""
Received handshake while no local Cluster.Member available, this may indicate that we were removed form the cluster.
Rejecting handshake
""")
return .init(
version: state.settings.protocolVersion,
targetNode: state.localNode,
originNode: offer.originNode,
reason: "Node cannot be part of cluster, no member available.",
whenHandshakeReplySent: nil
)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this rejection reason is nonsensical and was triggered in some of the tests sporadically; This is now made impossible for good.

let member = state.selfMember

if member.status.isAtLeast(.leaving) {
state.log.notice("Received handshake while already [\(member.status)]")

return .init(
version: state.settings.protocolVersion,
targetNode: state.localNode,
targetNode: state.selfNode,
originNode: offer.originNode,
reason: "Node already leaving cluster.",
whenHandshakeReplySent: nil
Expand Down Expand Up @@ -1129,7 +1114,7 @@ extension ClusterShell {
}

private func onRestInPeace(_ context: ActorContext<Message>, _ state: ClusterShellState, intendedNode: UniqueNode, fromNode: UniqueNode) -> Behavior<Message> {
let myselfNode = state.localNode
let myselfNode = state.selfNode

guard myselfNode == myselfNode else {
state.log.warning(
Expand Down Expand Up @@ -1196,12 +1181,12 @@ extension ClusterShell {
// FIXME: also close all associations (!!!)
switch $0 {
case .success:
context.log.info("Unbound server socket [\(addrDesc)], node: \(reflecting: state.localNode)")
context.log.info("Unbound server socket [\(addrDesc)], node: \(reflecting: state.selfNode)")
self.serializationPool.shutdown()
signalOnceUnbound.offerOnce(())
return .stop
case .failure(let err):
context.log.warning("Failed while unbinding server socket [\(addrDesc)], node: \(reflecting: state.localNode). Error: \(err)")
context.log.warning("Failed while unbinding server socket [\(addrDesc)], node: \(reflecting: state.selfNode). Error: \(err)")
self.serializationPool.shutdown()
signalOnceUnbound.offerOnce(())
throw err
Expand Down Expand Up @@ -1259,7 +1244,7 @@ extension ClusterShell {
// whenever we down a node we must ensure to confirm it to swim, so it won't keep monitoring it forever needlessly
self._swimRef?.tell(.local(SWIM.LocalMessage.confirmDead(memberToDown.uniqueNode)))

if memberToDown.uniqueNode == state.localNode {
if memberToDown.uniqueNode == state.selfNode {
// ==== ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// Down(self node); ensuring SWIM knows about this and should likely initiate graceful shutdown
context.log.warning(
Expand Down
28 changes: 19 additions & 9 deletions Sources/DistributedActors/Cluster/ClusterShellState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ internal protocol ReadOnlyClusterState {
var handshakeBackoff: BackoffStrategy { get }

/// Unique address of the current node.
var localNode: UniqueNode { get }
var localMember: Cluster.Member? { get } // TODO: enforce that we always have the localMember
var selfNode: UniqueNode { get }
var selfMember: Cluster.Member { get }

var settings: ClusterSettings { get }
}

Expand All @@ -45,9 +46,18 @@ internal struct ClusterShellState: ReadOnlyClusterState {

let channel: Channel

let localNode: UniqueNode
var localMember: Cluster.Member? {
self.membership.uniqueMember(self.localNode)
let selfNode: UniqueNode
var selfMember: Cluster.Member {
if let member = self.membership.uniqueMember(self.selfNode) {
return member
} else {
fatalError("""
ClusterShellState.selfMember was nil! This should be impossible by construction, because a node ALWAYS knows about itself.
Please report a bug on the distributed-actors issue tracker. Details:
Membership: \(self.membership)
Settings: \(self.settings)
""")
}
}

let eventLoopGroup: EventLoopGroup
Expand All @@ -58,7 +68,7 @@ internal struct ClusterShellState: ReadOnlyClusterState {

let allocator: ByteBufferAllocator

internal var _handshakes: [Node: HandshakeStateMachine.State] = [:]
var _handshakes: [Node: HandshakeStateMachine.State] = [:]

let gossiperControl: GossiperControl<Cluster.MembershipGossip, Cluster.MembershipGossip>

Expand Down Expand Up @@ -109,7 +119,7 @@ internal struct ClusterShellState: ReadOnlyClusterState {
self.allocator = settings.allocator
self.eventLoopGroup = settings.eventLoopGroup ?? settings.makeDefaultEventLoopGroup()

self.localNode = settings.uniqueBindNode
self.selfNode = settings.uniqueBindNode
self._latestGossip = Cluster.MembershipGossip(ownerNode: settings.uniqueBindNode)

self.events = events
Expand Down Expand Up @@ -145,7 +155,7 @@ extension ClusterShellState {

let initiated = HandshakeStateMachine.InitiatedState(
settings: self.settings,
localNode: self.localNode,
localNode: self.selfNode,
connectTo: remoteNode
)
let handshakeState = HandshakeStateMachine.State.initiated(initiated)
Expand Down Expand Up @@ -452,7 +462,7 @@ extension ClusterShellState {
return .init(applied: changeWasApplied)
}

self.log.trace("Membership updated on [\(self.localNode)] by \(event): \(pretty: self.membership)")
self.log.trace("Membership updated on [\(self.selfNode)] by \(event): \(pretty: self.membership)")
return .init(applied: changeWasApplied)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ internal struct HandshakeStateMachine {

let offer: Wire.HandshakeOffer
var boundAddress: UniqueNode {
self.state.localNode
self.state.selfNode
}

var protocolVersion: DistributedActors.Version {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ final class ClusterLeaderActionsTests: XCTestCase {
var stateC: ClusterShellState!

var nodeA: UniqueNode {
self.stateA.localNode
self.stateA.selfNode
}

var nodeB: UniqueNode {
self.stateB.localNode
self.stateB.selfNode
}

var nodeC: UniqueNode {
self.stateC.localNode
self.stateC.selfNode
}

override func setUp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ final class RemoteHandshakeStateMachineTests: XCTestCase {

func test_handshake_happyPath() throws {
let serverKernel = ClusterShellState.makeTestMock(side: .server)
let serverAddress = serverKernel.localNode
let serverAddress = serverKernel.selfNode

let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in
settings.node.port = 2222
}

// client
let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node)
let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node)
let offer = clientInitiated.makeOffer()

// server
Expand All @@ -55,26 +55,26 @@ final class RemoteHandshakeStateMachineTests: XCTestCase {

// then

serverCompleted.localNode.shouldEqual(serverKernel.localNode)
serverCompleted.remoteNode.shouldEqual(clientKernel.localNode)
serverCompleted.localNode.shouldEqual(serverKernel.selfNode)
serverCompleted.remoteNode.shouldEqual(clientKernel.selfNode)

clientCompleted.remoteNode.shouldEqual(serverKernel.localNode)
clientCompleted.localNode.shouldEqual(clientKernel.localNode)
clientCompleted.remoteNode.shouldEqual(serverKernel.selfNode)
clientCompleted.localNode.shouldEqual(clientKernel.selfNode)
}

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Version negotiation

func test_negotiate_server_shouldAcceptClient_newerPatch() throws {
let serverKernel = ClusterShellState.makeTestMock(side: .server)
let serverAddress = serverKernel.localNode
let serverAddress = serverKernel.selfNode

let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in
settings.node.port = 2222
settings._protocolVersion.patch += 1
}

let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node)
let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node)
let offer = clientInitiated.makeOffer()

// server
Expand All @@ -92,14 +92,14 @@ final class RemoteHandshakeStateMachineTests: XCTestCase {

func test_negotiate_server_shouldRejectClient_newerMajor() throws {
let serverKernel = ClusterShellState.makeTestMock(side: .server)
let serverAddress = serverKernel.localNode
let serverAddress = serverKernel.selfNode

let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in
settings.node.port = 2222
settings._protocolVersion.major += 1
}

let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node)
let clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node)
let offer = clientInitiated.makeOffer()

// server
Expand All @@ -123,14 +123,14 @@ final class RemoteHandshakeStateMachineTests: XCTestCase {

func test_onTimeout_shouldReturnNewHandshakeOffersMultipleTimes() throws {
let serverKernel = ClusterShellState.makeTestMock(side: .server)
let serverAddress = serverKernel.localNode
let serverAddress = serverKernel.selfNode

let clientKernel = ClusterShellState.makeTestMock(side: .client) { settings in
settings.node.port = 8228
}

// client
var clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.localNode, connectTo: serverAddress.node)
var clientInitiated = HSM.InitiatedState(settings: clientKernel.settings, localNode: clientKernel.selfNode, connectTo: serverAddress.node)

guard case .scheduleRetryHandshake = clientInitiated.onHandshakeTimeout() else {
throw shouldNotHappen("Expected retry attempt after handshake timeout")
Expand Down
14 changes: 7 additions & 7 deletions scripts/docs/generate_api.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ if [[ "$(uname -s)" == "Linux" ]]; then
cd "$source_kitten_source_path" && swift build -c release && cd "$root_path"
fi
# generate
# for module in "${modules[@]}"; do
## if [[ ! -f "$root_path/.build/sourcekitten/$module.json" ]]; then
# # always generate, otherwise we miss things when we're iterating on adding docs.
# echo "Generating $root_path/.build/sourcekitten/$module.json ..."
# "$source_kitten_path/sourcekitten" doc --spm-module "$module" > "$root_path/.build/sourcekitten/$module.json"
## fi
# done
for module in "${modules[@]}"; do
# if [[ ! -f "$root_path/.build/sourcekitten/$module.json" ]]; then
# always generate, otherwise we miss things when we're iterating on adding docs.
echo "Generating $root_path/.build/sourcekitten/$module.json ..."
"$source_kitten_path/sourcekitten" doc --spm-module "$module" > "$root_path/.build/sourcekitten/$module.json"
# fi
done
fi

# prep index
Expand Down