diff --git a/Sources/DistributedActors/CRDT/CRDT+Gossip.swift b/Sources/DistributedActors/CRDT/CRDT+Gossip.swift index 27658f1b0..95215d544 100644 --- a/Sources/DistributedActors/CRDT/CRDT+Gossip.swift +++ b/Sources/DistributedActors/CRDT/CRDT+Gossip.swift @@ -20,7 +20,8 @@ extension CRDT { /// It collaborates with the Direct Replicator in order to avoid needlessly sending values to nodes which already know /// about them (e.g. through direct replication). final class GossipReplicatorLogic: GossipLogic { - typealias Envelope = CRDT.Gossip + typealias Gossip = CRDT.Gossip + typealias Acknowledgement = CRDT.GossipAck let identity: CRDT.Identity let context: Context @@ -54,7 +55,7 @@ extension CRDT { // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Spreading gossip - func selectPeers(peers: [AddressableActorRef]) -> [AddressableActorRef] { + func selectPeers(_ peers: [AddressableActorRef]) -> [AddressableActorRef] { // how many peers we select in each gossip round, // we could for example be dynamic and notice if we have 10+ nodes, we pick 2 members to speed up the dissemination etc. let n = 1 @@ -79,32 +80,34 @@ extension CRDT { self.latest } - func receivePayloadACK(target: AddressableActorRef, confirmedDeliveryOf envelope: CRDT.Gossip) { - guard (self.latest.map { $0.payload.equalState(to: envelope.payload) } ?? false) else { + func receiveAcknowledgement(_ acknowledgement: Acknowledgement, from peer: AddressableActorRef, confirming gossip: CRDT.Gossip) { + guard (self.latest.map { $0.payload.equalState(to: gossip.payload) } ?? false) else { // received an ack for something, however it's not the "latest" anymore, so we need to gossip to target anyway return } // TODO: in v3 this would translate to ACKing specific deltas for this target // good, the latest gossip is still the same as was confirmed here, so we can mark it acked - self.peersAckedOurLatestGossip.insert(target.address) + self.peersAckedOurLatestGossip.insert(peer.address) } // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Receiving gossip - func receiveGossip(origin: AddressableActorRef, payload: CRDT.Gossip) { + func receiveGossip(_ gossip: Gossip, from peer: AddressableActorRef) -> CRDT.GossipAck? { // merge the datatype locally, and update our information about the origin's knowledge about this datatype // (does it already know about our data/all-deltas-we-are-aware-of or not) - self.mergeInbound(from: origin, payload) + self.mergeInbound(from: peer, gossip) // notify the direct replicator to update all local `actorOwned` CRDTs. // TODO: the direct replicator may choose to delay flushing this information a bit to avoid much data churn see `settings.crdt.` - self.replicatorControl.tellGossipWrite(id: self.identity, data: payload.payload) + self.replicatorControl.tellGossipWrite(id: self.identity, data: gossip.payload) + + return CRDT.GossipAck() } - func localGossipUpdate(payload: CRDT.Gossip) { - self.mergeInbound(from: nil, payload) + func receiveLocalGossipUpdate(_ gossip: CRDT.Gossip) { + self.mergeInbound(from: nil, gossip) // during the next gossip round we'll gossip the latest most-up-to date version now; // no need to schedule that, we'll be called when it's time. } @@ -117,8 +120,8 @@ extension CRDT { // and gossip replicator, so that's a downside that we'll eventually want to address. enum SideChannelMessage { - case localUpdate(Envelope) - case ack(origin: AddressableActorRef, payload: Envelope) + case localUpdate(Gossip) + case ack(origin: AddressableActorRef, payload: Gossip) } func receiveSideChannelMessage(message: Any) throws { @@ -177,15 +180,10 @@ extension CRDT.Identity: GossipIdentifier { extension CRDT { /// The gossip to be spread about a specific CRDT (identity). - struct Gossip: GossipEnvelopeProtocol, CustomStringConvertible, CustomPrettyStringConvertible { - struct Metadata: Codable {} - typealias Payload = StateBasedCRDT - - var metadata: Metadata - var payload: Payload + struct Gossip: Codable, CustomStringConvertible, CustomPrettyStringConvertible { + var payload: StateBasedCRDT - init(metadata: CRDT.Gossip.Metadata, payload: CRDT.Gossip.Payload) { - self.metadata = metadata + init(payload: StateBasedCRDT) { self.payload = payload } @@ -198,15 +196,15 @@ extension CRDT { } var description: String { - "CRDT.Gossip(metadata: \(metadata), payload: \(payload))" + "CRDT.Gossip(\(payload))" } } + + struct GossipAck: Codable {} } extension CRDT.Gossip { enum CodingKeys: CodingKey { - case metadata - case metadataManifest case payload case payloadManifest } @@ -218,10 +216,6 @@ extension CRDT.Gossip { let container = try decoder.container(keyedBy: CodingKeys.self) - let manifestData = try container.decode(Data.self, forKey: .metadata) - let manifestManifest = try container.decode(Serialization.Manifest.self, forKey: .metadataManifest) - self.metadata = try context.serialization.deserialize(as: Metadata.self, from: .data(manifestData), using: manifestManifest) - let payloadData = try container.decode(Data.self, forKey: .payload) let payloadManifest = try container.decode(Serialization.Manifest.self, forKey: .payloadManifest) self.payload = try context.serialization.deserialize(as: StateBasedCRDT.self, from: .data(payloadData), using: payloadManifest) @@ -237,9 +231,5 @@ extension CRDT.Gossip { let serializedPayload = try context.serialization.serialize(self.payload) try container.encode(serializedPayload.buffer.readData(), forKey: .payload) try container.encode(serializedPayload.manifest, forKey: .payloadManifest) - - let serializedMetadata = try context.serialization.serialize(self.metadata) - try container.encode(serializedMetadata.buffer.readData(), forKey: .metadata) - try container.encode(serializedMetadata.manifest, forKey: .metadataManifest) } } diff --git a/Sources/DistributedActors/CRDT/CRDT+Replication.swift b/Sources/DistributedActors/CRDT/CRDT+Replication.swift index 42a62a014..33a2369a7 100644 --- a/Sources/DistributedActors/CRDT/CRDT+Replication.swift +++ b/Sources/DistributedActors/CRDT/CRDT+Replication.swift @@ -278,6 +278,10 @@ extension CRDT { /// during this round public var gossipInterval: TimeAmount = .seconds(2) + /// Timeout used when asking another peer when spreading gossip. + /// Timeouts are logged, but by themselves not "errors", as we still eventually may be able to spread the payload to given peer. + public var gossipAcknowledgementTimeout: TimeAmount = .milliseconds(500) + /// Adds a random factor to the gossip interval, which is useful to avoid an entire cluster ticking "synchronously" /// at the same time, causing spikes in gossip traffic (as all nodes decide to gossip in the same second). /// diff --git a/Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift b/Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift index 90c7ea2aa..622ca3f67 100644 --- a/Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift +++ b/Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift @@ -33,7 +33,7 @@ extension CRDT.Replicator { typealias RemoteDeleteResult = CRDT.Replicator.RemoteCommand.DeleteResult private let directReplicator: CRDT.Replicator.Instance - private var gossipReplication: GossipControl! + private var gossipReplication: GossiperControl! // TODO: better name; this is the control from Gossip -> Local struct LocalControl { @@ -71,12 +71,13 @@ extension CRDT.Replicator { } ) - self.gossipReplication = try Gossiper.start( + self.gossipReplication = try Gossiper.spawn( context, name: "gossip", settings: Gossiper.Settings( - gossipInterval: self.settings.gossipInterval, - gossipIntervalRandomFactor: self.settings.gossipIntervalRandomFactor, + interval: self.settings.gossipInterval, + intervalRandomFactor: self.settings.gossipIntervalRandomFactor, + style: .acknowledged(timeout: .seconds(1)), peerDiscovery: .fromReceptionistListing(id: "crdt-gossip-replicator") ), makeLogic: { logicContext in @@ -267,10 +268,8 @@ extension CRDT.Replicator { _ id: CRDT.Identity, _ data: StateBasedCRDT ) { - let gossip = CRDT.Gossip( - metadata: .init(), - payload: data // TODO: v2, allow tracking the deltas here - ) + // TODO: v2, allow tracking the deltas here + let gossip = CRDT.Gossip(payload: data) self.gossipReplication.update(id, payload: gossip) } @@ -407,10 +406,7 @@ extension CRDT.Replicator { replyTo.tell(.success(updatedData)) // Update the data stored in the replicator (yeah today we store 2 copies in the replicators, we could converge them into one with enough effort) - let gossip = CRDT.Gossip( - metadata: .init(), - payload: updatedData - ) + let gossip = CRDT.Gossip(payload: updatedData) self.gossipReplication.update(id, payload: gossip) // TODO: v2, allow tracking the deltas here // Followed by notifying all owners since the CRDT might have been updated diff --git a/Sources/DistributedActors/Cluster/Cluster+Membership.swift b/Sources/DistributedActors/Cluster/Cluster+Membership.swift index 3652a4dd7..52a006c1d 100644 --- a/Sources/DistributedActors/Cluster/Cluster+Membership.swift +++ b/Sources/DistributedActors/Cluster/Cluster+Membership.swift @@ -238,9 +238,9 @@ extension Cluster.Membership: Hashable { extension Cluster.Membership: CustomStringConvertible, CustomDebugStringConvertible, CustomPrettyStringConvertible { /// Pretty multi-line output of a membership, useful for manual inspection public var prettyDescription: String { - var res = "LEADER: \(self.leader, orElse: ".none")" + var res = "leader: \(self.leader, orElse: ".none")" for member in self._members.values.sorted(by: { $0.node.node.port < $1.node.node.port }) { - res += "\n \(reflecting: member.node) STATUS: [\(member.status.rawValue, leftPadTo: Cluster.MemberStatus.maxStrLen)]" + res += "\n \(reflecting: member.node) status [\(member.status.rawValue, leftPadTo: Cluster.MemberStatus.maxStrLen)]" } return res } diff --git a/Sources/DistributedActors/Cluster/Cluster+MembershipGossipLogic.swift b/Sources/DistributedActors/Cluster/Cluster+MembershipGossipLogic.swift deleted file mode 100644 index 6af79fdc6..000000000 --- a/Sources/DistributedActors/Cluster/Cluster+MembershipGossipLogic.swift +++ /dev/null @@ -1,148 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Distributed Actors open source project -// -// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import NIO - -// ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Membership Gossip Logic - -final class MembershipGossipLogic: GossipLogic { - typealias Envelope = Cluster.Gossip - - private let context: Context - private lazy var localNode: UniqueNode = self.context.system.cluster.node - - private var latestGossip: Cluster.Gossip - private let notifyOnGossipRef: ActorRef - - init(_ context: Context, notifyOnGossipRef: ActorRef) { - self.context = context - self.notifyOnGossipRef = notifyOnGossipRef - self.latestGossip = .init(ownerNode: context.system.cluster.node) - } - - // ==== ------------------------------------------------------------------------------------------------------------ - // MARK: Spreading gossip - - // TODO: implement better, only peers which are "behind" - func selectPeers(peers: [AddressableActorRef]) -> [AddressableActorRef] { - // how many peers we select in each gossip round, - // we could for example be dynamic and notice if we have 10+ nodes, we pick 2 members to speed up the dissemination etc. - let n = 1 - - var selectedPeers: [AddressableActorRef] = [] - selectedPeers.reserveCapacity(n) - - for peer in peers.shuffled() - where selectedPeers.count < n && self.shouldGossipWith(peer) { - selectedPeers.append(peer) - } - - return selectedPeers - } - - func makePayload(target: AddressableActorRef) -> Cluster.Gossip? { - // today we don't trim payloads at all - self.latestGossip - } - - func receivePayloadACK(target: AddressableActorRef, confirmedDeliveryOf envelope: Cluster.Gossip) { - // nothing to do - } - - /// True if the peers is "behind" in terms of information it has "seen" (as determined by comparing our and its seen tables). - private func shouldGossipWith(_ peer: AddressableActorRef) -> Bool { - guard let remoteNode = peer.address.node else { - // targets should always be remote peers; one not having a node should not happen, let's ignore it as a gossip target - return false - } - -// guard let remoteSeenVersion = self.latestGossip.seen.version(at: remoteNode) else { - guard self.latestGossip.seen.version(at: remoteNode) != nil else { - // this peer has never seen any information from us, so we definitely want to push a gossip - return true - } - - // FIXME: this is longer than may be necessary, optimize some more - return true - - // TODO: optimize some more; but today we need to keep gossiping until all VVs are the same, because convergence depends on this -// switch self.latestGossip.seen.compareVersion(observedOn: self.localNode, to: remoteSeenVersion) { -// case .happenedBefore, .same: -// // we have strictly less information than the peer, no need to gossip to it -// return false -// case .concurrent, .happenedAfter: -// // we have strictly concurrent or more information the peer, gossip with it -// return true -// } - } - - // ==== ------------------------------------------------------------------------------------------------------------ - // MARK: Receiving gossip - - func receiveGossip(origin: AddressableActorRef, payload: Cluster.Gossip) { - self.mergeInbound(payload) - self.notifyOnGossipRef.tell(self.latestGossip) - } - - func localGossipUpdate(payload: Cluster.Gossip) { - self.mergeInbound(payload) - } - - // ==== ------------------------------------------------------------------------------------------------------------ - // MARK: Side-channel - - enum SideChannelMessage { - case localUpdate(Envelope) - } - - func receiveSideChannelMessage(message: Any) throws { - guard let sideChannelMessage = message as? SideChannelMessage else { - self.context.system.deadLetters.tell(DeadLetter(message, recipient: self.context.gossiperAddress)) - return - } - - switch sideChannelMessage { - case .localUpdate(let payload): - self.mergeInbound(payload) - } - } - - // ==== ------------------------------------------------------------------------------------------------------------ - // MARK: Utilities - - private func mergeInbound(_ incoming: Cluster.Gossip) { - _ = self.latestGossip.mergeForward(incoming: incoming) - // effects are signalled via the ClusterShell, not here (it will also perform a merge) // TODO: a bit duplicated, could we maintain it here? - } -} - -// ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Membership Gossip Logic Control - -let MembershipGossipIdentifier: StringGossipIdentifier = "membership" - -extension GossipControl where GossipEnvelope == Cluster.Gossip { - func update(payload: GossipEnvelope) { - self.update(MembershipGossipIdentifier, payload: payload) - } - - func remove() { - self.remove(MembershipGossipIdentifier) - } - - func sideChannelTell(message: Any) { - self.sideChannelTell(MembershipGossipIdentifier, message: message) - } -} diff --git a/Sources/DistributedActors/Cluster/ClusterSettings.swift b/Sources/DistributedActors/Cluster/ClusterSettings.swift index b2de9c659..7995b0ed1 100644 --- a/Sources/DistributedActors/Cluster/ClusterSettings.swift +++ b/Sources/DistributedActors/Cluster/ClusterSettings.swift @@ -110,6 +110,12 @@ public struct ClusterSettings { public var membershipGossipInterval: TimeAmount = .seconds(1) + // since we talk to many peers one by one; even as we proceed to the next round after `membershipGossipInterval` + // it is fine if we get a reply from the previously gossiped to peer after same or similar timeout. No rush about it. + // + // A missing ACK is not terminal, may happen, and we'll then gossip with that peer again (e.g. if it ha had some form of network trouble for a moment). + public var membershipGossipAcknowledgementTimeout: TimeAmount = .seconds(1) + public var membershipGossipIntervalRandomFactor: Double = 0.2 // ==== ------------------------------------------------------------------------------------------------------------ diff --git a/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift b/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift index 60d26a966..3e8164773 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift @@ -27,7 +27,7 @@ extension ClusterShellState { } guard self.latestGossip.converged() else { - return [] // leader actions are only performed when + return [] // leader actions are only performed when up nodes are converged } func collectMemberUpMoves() -> [LeaderAction] { @@ -65,7 +65,8 @@ extension ClusterShellState { extension ClusterShell { func interpretLeaderActions( - _ system: ActorSystem, _ previousState: ClusterShellState, + _ system: ActorSystem, + _ previousState: ClusterShellState, _ leaderActions: [ClusterShellState.LeaderAction], file: String = #file, line: UInt = #line ) -> ClusterShellState { @@ -73,19 +74,6 @@ extension ClusterShell { return previousState } - guard previousState.latestGossip.converged() else { - previousState.log.warning( - "Skipping leader actions, gossip not converged", - metadata: [ - "tag": "leader-action", - "leader/actions": "\(leaderActions)", - "gossip/current": "\(previousState.latestGossip)", - "leader/interpret/location": "\(file):\(line)", - ] - ) - return previousState - } - var state = previousState state.log.trace( "Performing leader actions: \(leaderActions)", @@ -118,6 +106,8 @@ extension ClusterShell { ] ) + system.cluster.updateMembershipSnapshot(state.membership) + return state } @@ -145,7 +135,7 @@ extension ClusterShell { return } state._latestGossip.incrementOwnerVersion() - state.gossipControl.update(payload: state._latestGossip) + state.gossiperControl.update(payload: state._latestGossip) self.terminateAssociation(system, state: &state, memberToRemove.node) diff --git a/Sources/DistributedActors/Cluster/ClusterShell+Logging.swift b/Sources/DistributedActors/Cluster/ClusterShell+Logging.swift index 8cba021f7..c7f074a1a 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell+Logging.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell+Logging.swift @@ -85,7 +85,7 @@ extension ClusterShell { case send(to: Node) case receive(from: Node) case receiveUnique(from: UniqueNode) - case gossip(Cluster.Gossip) + case gossip(Cluster.MembershipGossip) var description: String { switch self { diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index a97091a41..f0874c44c 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -314,7 +314,7 @@ internal class ClusterShell { /// Gossiping is handled by /system/cluster/gossip, however acting on it still is our task, /// thus the gossiper forwards gossip whenever interesting things happen ("more up to date gossip") /// to the shell, using this message, so we may act on it -- e.g. perform leader actions or change membership that we store. - case gossipFromGossiper(Cluster.Gossip) + case gossipFromGossiper(Cluster.MembershipGossip) } // this is basically our API internally for this system @@ -422,23 +422,23 @@ extension ClusterShell { return context.awaitResultThrowing(of: chanElf, timeout: clusterSettings.bindTimeout) { (chan: Channel) in context.log.info("Bound to \(chan.localAddress.map { $0.description } ?? "")") - // TODO: Membership.Gossip? - let gossipControl: GossipControl = try Gossiper.start( + let gossiperControl: GossiperControl = try Gossiper.spawn( context, name: "\(ActorPath._clusterGossip.name)", - props: ._wellKnown, settings: .init( - gossipInterval: clusterSettings.membershipGossipInterval, - gossipIntervalRandomFactor: clusterSettings.membershipGossipIntervalRandomFactor, + interval: clusterSettings.membershipGossipInterval, + intervalRandomFactor: clusterSettings.membershipGossipIntervalRandomFactor, + style: .acknowledged(timeout: clusterSettings.membershipGossipInterval), peerDiscovery: .onClusterMember(atLeast: .joining, resolve: { member in - let resolveContext = ResolveContext.Message>(address: ._clusterGossip(on: member.node), system: context.system) + let resolveContext = ResolveContext.Message>(address: ._clusterGossip(on: member.node), system: context.system) return context.system._resolve(context: resolveContext).asAddressable() }) ), + props: ._wellKnown, makeLogic: { MembershipGossipLogic( $0, - notifyOnGossipRef: context.messageAdapter(from: Cluster.Gossip.self) { + notifyOnGossipRef: context.messageAdapter(from: Cluster.MembershipGossip.self) { Optional.some(Message.gossipFromGossiper($0)) } ) @@ -449,17 +449,17 @@ extension ClusterShell { settings: clusterSettings, channel: chan, events: self.clusterEvents, - gossipControl: gossipControl, + gossiperControl: gossiperControl, log: context.log ) // loop through "self" cluster shell, which in result causes notifying all subscribers about cluster membership change - var firstGossip = Cluster.Gossip(ownerNode: state.localNode) + var firstGossip = Cluster.MembershipGossip(ownerNode: state.localNode) _ = firstGossip.membership.join(state.localNode) // change will be put into effect by receiving the "self gossip" firstGossip.incrementOwnerVersion() context.system.cluster.updateMembershipSnapshot(state.membership) - gossipControl.update(payload: firstGossip) // ???? + gossiperControl.update(payload: firstGossip) // ???? context.myself.tell(.gossipFromGossiper(firstGossip)) // TODO: are we ok if we received another gossip first, not our own initial? should be just fine IMHO @@ -550,28 +550,37 @@ extension ClusterShell { self.tracelog(context, .receive(from: state.localNode.node), message: event) var state = state - let changeDirective = state.applyClusterEvent(event) - state = self.interpretLeaderActions(context.system, state, state.collectLeaderActions()) - - if case .membershipChange(let change) = event { - self.tryIntroduceGossipPeer(context, state, change: change) - } - - if changeDirective.applied { + // 1) IMPORTANT: We MUST apply and act on the incoming event FIRST, before handling the other events. + // This is because: + // - is the event is a `leadershipChange` applying it could make us become the leader + // - if that happens, the 2) phase will collect leader actions (perhaps for the first time), + // and issue any pending up/down events. + // - For consistency such events MUST only be issued AFTER we have emitted the leadership change (!) + // Otherwise subscribers may end up seeing joining->up changes BEFORE they see the leadershipChange, + // which is not strictly wrong per se, however it is very confusing -- we know there MUST be a leader + // somewhere in order to perform those moves, so it is confusing if such joining->up events were to be + // seen by a subscriber before they saw "we have a leader". + if state.applyClusterEvent(event).applied { state.latestGossip.incrementOwnerVersion() - // update the membership snapshot before publishing change events - context.system.cluster.updateMembershipSnapshot(state.membership) // we only publish the event if it really caused a change in membership, to avoid echoing "the same" change many times. self.clusterEvents.publish(event) } // else no "effective change", thus we do not publish events + // 2) Collect and interpret leader actions which may result changing the membership and publishing events for the changes + let actions: [ClusterShellState.LeaderAction] = state.collectLeaderActions() + state = self.interpretLeaderActions(context.system, state, actions) + + if case .membershipChange(let change) = event { + self.tryIntroduceGossipPeer(context, state, change: change) + } + return self.ready(state: state) } func receiveMembershipGossip( _ context: ActorContext, _ state: ClusterShellState, - gossip: Cluster.Gossip + gossip: Cluster.MembershipGossip ) -> Behavior { tracelog(context, .gossip(gossip), message: gossip) var state = state @@ -603,11 +612,9 @@ extension ClusterShell { } let leaderActions = state.collectLeaderActions() - if !leaderActions.isEmpty { - state.log.trace("Leadership actions upon gossip: \(leaderActions)", metadata: ["tag": "membership"]) - } - state = self.interpretLeaderActions(context.system, state, leaderActions) + + // definitely update the snapshot; even if no leader actions performed context.system.cluster.updateMembershipSnapshot(state.membership) return self.ready(state: state) @@ -636,12 +643,12 @@ extension ClusterShell { // TODO: make it cleaner? though we decided to go with manual peer management as the ClusterShell owns it, hm // TODO: consider receptionist instead of this; we're "early" but receptionist could already be spreading its info to this node, since we associated. - let gossipPeer: GossipShell.Ref = context.system._resolve( + let gossipPeer: GossipShell.Ref = context.system._resolve( context: .init(address: ._clusterGossip(on: change.member.node), system: context.system) ) // FIXME: make sure that if the peer terminated, we don't add it again in here, receptionist would be better then to power this... // today it can happen that a node goes down but we dont know yet so we add it again :O - state.gossipControl.introduce(peer: gossipPeer) + state.gossiperControl.introduce(peer: gossipPeer) } } @@ -1239,7 +1246,6 @@ extension ClusterShell { } state = self.interpretLeaderActions(context.system, state, state.collectLeaderActions()) - return state } diff --git a/Sources/DistributedActors/Cluster/ClusterShellState.swift b/Sources/DistributedActors/Cluster/ClusterShellState.swift index 6db8de998..033bd0a08 100644 --- a/Sources/DistributedActors/Cluster/ClusterShellState.swift +++ b/Sources/DistributedActors/Cluster/ClusterShellState.swift @@ -60,14 +60,14 @@ internal struct ClusterShellState: ReadOnlyClusterState { internal var _handshakes: [Node: HandshakeStateMachine.State] = [:] - let gossipControl: GossipControl + let gossiperControl: GossiperControl /// Updating the `latestGossip` causes the gossiper to be informed about it, such that the next time it does a gossip round /// it uses the latest gossip available. - var _latestGossip: Cluster.Gossip + var _latestGossip: Cluster.MembershipGossip /// Any change to the gossip data, is propagated to the gossiper immediately. - var latestGossip: Cluster.Gossip { + var latestGossip: Cluster.MembershipGossip { get { self._latestGossip } @@ -75,7 +75,7 @@ internal struct ClusterShellState: ReadOnlyClusterState { if self._latestGossip.membership == newValue.membership { self._latestGossip = newValue } else { - let next: Cluster.Gossip + let next: Cluster.MembershipGossip if self._latestGossip.version == newValue.version { next = newValue.incrementingOwnerVersion() } else { @@ -84,7 +84,7 @@ internal struct ClusterShellState: ReadOnlyClusterState { self._latestGossip = next } - self.gossipControl.update(payload: self._latestGossip) + self.gossiperControl.update(payload: self._latestGossip) } } @@ -101,7 +101,7 @@ internal struct ClusterShellState: ReadOnlyClusterState { settings: ClusterSettings, channel: Channel, events: EventStream, - gossipControl: GossipControl, + gossiperControl: GossiperControl, log: Logger ) { self.log = log @@ -110,10 +110,10 @@ internal struct ClusterShellState: ReadOnlyClusterState { self.eventLoopGroup = settings.eventLoopGroup ?? settings.makeDefaultEventLoopGroup() self.localNode = settings.uniqueBindNode - self._latestGossip = Cluster.Gossip(ownerNode: settings.uniqueBindNode) + self._latestGossip = Cluster.MembershipGossip(ownerNode: settings.uniqueBindNode) self.events = events - self.gossipControl = gossipControl + self.gossiperControl = gossiperControl self.channel = channel } @@ -251,12 +251,23 @@ extension ClusterShellState { return .negotiateIncoming(fsm) } - guard existingAssociation == nil else { - let error = HandshakeStateMachine.HandshakeConnectionError( - node: offer.originNode.node, - message: "Terminating this connection, the node [\(offer.originNode)] is already associated. Possibly a delayed handshake retry message was delivered?" - ) - return .abortIncomingHandshake(error) + if let assoc = existingAssociation { + switch assoc.state { + case .associating: + () // continue, we'll perform the tie-breaker logic below + case .associated: + let error = HandshakeStateMachine.HandshakeConnectionError( + node: offer.originNode.node, + message: "Terminating this connection, the node [\(offer.originNode)] is already associated. Possibly a delayed handshake retry message was delivered?" + ) + return .abortIncomingHandshake(error) + case .tombstone: + let error = HandshakeStateMachine.HandshakeConnectionError( + node: offer.originNode.node, + message: "Terminating this connection, the node [\(offer.originNode)] is already tombstone-ed. Possibly a delayed handshake retry message was delivered?" + ) + return .abortIncomingHandshake(error) + } } guard let inProgress = self._handshakes[offer.originNode.node] else { diff --git a/Sources/DistributedActors/Cluster/Cluster+Gossip+Serialization.swift b/Sources/DistributedActors/Cluster/MembershipGossip/Cluster+MembershipGossip+Serialization.swift similarity index 93% rename from Sources/DistributedActors/Cluster/Cluster+Gossip+Serialization.swift rename to Sources/DistributedActors/Cluster/MembershipGossip/Cluster+MembershipGossip+Serialization.swift index a28636aa4..4a133046f 100644 --- a/Sources/DistributedActors/Cluster/Cluster+Gossip+Serialization.swift +++ b/Sources/DistributedActors/Cluster/MembershipGossip/Cluster+MembershipGossip+Serialization.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// -extension Cluster.Gossip: ProtobufRepresentable { +extension Cluster.MembershipGossip: ProtobufRepresentable { typealias ProtobufRepresentation = ProtoClusterMembershipGossip public func toProto(context: Serialization.Context) throws -> ProtobufRepresentation { @@ -36,13 +36,13 @@ extension Cluster.Gossip: ProtobufRepresentable { public init(fromProto proto: ProtobufRepresentation, context: Serialization.Context) throws { guard proto.ownerUniqueNodeID != 0 else { - throw SerializationError.missingField("ownerUniqueNodeID", type: "\(reflecting: Cluster.Gossip.self)") + throw SerializationError.missingField("ownerUniqueNodeID", type: "\(reflecting: Cluster.MembershipGossip.self)") } guard proto.hasMembership else { - throw SerializationError.missingField("membership", type: "\(reflecting: Cluster.Gossip.self)") + throw SerializationError.missingField("membership", type: "\(reflecting: Cluster.MembershipGossip.self)") } guard proto.hasSeenTable else { - throw SerializationError.missingField("seenTable", type: "\(reflecting: Cluster.Gossip.self)") + throw SerializationError.missingField("seenTable", type: "\(reflecting: Cluster.MembershipGossip.self)") } let membership = try Cluster.Membership(fromProto: proto.membership, context: context) @@ -52,7 +52,7 @@ extension Cluster.Gossip: ProtobufRepresentable { throw SerializationError.unableToDeserialize(hint: "Missing member for ownerUniqueNodeID, members: \(membership)") } - var gossip = Cluster.Gossip(ownerNode: ownerNode) + var gossip = Cluster.MembershipGossip(ownerNode: ownerNode) gossip.membership = membership gossip.seen.underlying.reserveCapacity(proto.seenTable.rows.count) for row in proto.seenTable.rows { diff --git a/Sources/DistributedActors/Cluster/Cluster+Gossip.swift b/Sources/DistributedActors/Cluster/MembershipGossip/Cluster+MembershipGossip.swift similarity index 90% rename from Sources/DistributedActors/Cluster/Cluster+Gossip.swift rename to Sources/DistributedActors/Cluster/MembershipGossip/Cluster+MembershipGossip.swift index 08964a01f..9bf08555e 100644 --- a/Sources/DistributedActors/Cluster/Cluster+Gossip.swift +++ b/Sources/DistributedActors/Cluster/MembershipGossip/Cluster+MembershipGossip.swift @@ -19,24 +19,14 @@ extension Cluster { /// Gossip payload about members in the cluster. /// /// Used to guarantee phrases like "all nodes have seen a node A in status S", upon which the Leader may act. - struct Gossip: ActorMessage, Equatable { + struct MembershipGossip: ActorMessage, Equatable { let owner: UniqueNode /// A table maintaining our perception of other nodes views on the version of membership. /// Each row in the table represents what versionVector we know the given node has observed recently. /// It may have in the mean time of course observed a new version already. // TODO: There is tons of compression opportunity about not having to send full tables around in general, but for now we will just send them around // FIXME: ensure that we never have a seen entry for a non-member - // bad: "actor/message": Gossip( - // owner: sact://first:2342486320@127.0.0.1:9001, - // seen: Cluster.Gossip.SeenTable( - // [sact://second:4264003847@127.0.0.1:9002: [uniqueNode:sact://second@127.0.0.1:9002: 2], - // sact://first:2342486320@127.0.0.1:9001: [uniqueNode:sact://first@127.0.0.1:9001: 4, uniqueNode:sact://second@127.0.0.1:9002: 2]] - // ), - // membership: Membership(count: 2, leader: Member(sact://first@127.0.0.1:9001, status: joining, reachability: reachable), - // members: [ - // Member(sact://first:2342486320@127.0.0.1:9001, status: joining, reachability: reachable), - // Member(sact://second-REPLACEMENT:871659343@127.0.0.1:9002, status: joining, reachability: reachable)])) - var seen: Cluster.Gossip.SeenTable + var seen: Cluster.MembershipGossip.SeenTable /// The version vector of this gossip and the `Membership` state owned by it. var version: VersionVector { self.seen.underlying[self.owner]! // !-safe, since we _always_ know our own world view @@ -49,7 +39,7 @@ extension Cluster { init(ownerNode: UniqueNode) { self.owner = ownerNode // self.seen = Cluster.Gossip.SeenTable(myselfNode: ownerNode, version: VersionVector((.uniqueNode(ownerNode), 1))) - self.seen = Cluster.Gossip.SeenTable(myselfNode: ownerNode, version: VersionVector()) + self.seen = Cluster.MembershipGossip.SeenTable(myselfNode: ownerNode, version: VersionVector()) // The actual payload self.membership = .empty // MUST be empty, as on the first "self gossip, we cause all ClusterEvents @@ -68,7 +58,7 @@ extension Cluster { /// Merge an incoming gossip _into_ the current gossip. /// Ownership of this gossip is retained, versions are bumped, and membership is merged. - mutating func mergeForward(incoming: Gossip) -> MergeDirective { + mutating func mergeForward(incoming: MembershipGossip) -> MergeDirective { var incoming = incoming // 1) decide the relationship between this gossip and the incoming one @@ -169,25 +159,12 @@ extension Cluster { } } -extension Cluster.Gossip: GossipEnvelopeProtocol { - typealias Metadata = SeenTable - typealias Payload = Self - - var metadata: Metadata { - self.seen - } - - var payload: Payload { - self - } -} - -extension Cluster.Gossip: CustomPrettyStringConvertible {} +extension Cluster.MembershipGossip: CustomPrettyStringConvertible {} // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Cluster.Gossip.SeenTable -extension Cluster.Gossip { +extension Cluster.MembershipGossip { /// A table containing information about which node has seen the gossip at which version. /// /// It is best visualized as a series of views (by "owners" of a row) onto the state of the cluster. @@ -300,9 +277,9 @@ extension Cluster.Gossip { } } -extension Cluster.Gossip.SeenTable: CustomStringConvertible, CustomPrettyStringConvertible { +extension Cluster.MembershipGossip.SeenTable: CustomStringConvertible, CustomPrettyStringConvertible { public var description: String { - "Cluster.Gossip.SeenTable(\(self.underlying))" + "Cluster.MembershipGossip.SeenTable(\(self.underlying))" } public func prettyDescription(depth: Int) -> String { diff --git a/Sources/DistributedActors/Cluster/MembershipGossip/Cluster+MembershipGossipLogic.swift b/Sources/DistributedActors/Cluster/MembershipGossip/Cluster+MembershipGossipLogic.swift new file mode 100644 index 000000000..578a1e996 --- /dev/null +++ b/Sources/DistributedActors/Cluster/MembershipGossip/Cluster+MembershipGossipLogic.swift @@ -0,0 +1,219 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Membership Gossip Logic + +/// The logic of the membership gossip. +/// +/// Membership gossip is what is used to reach cluster "convergence" upon which a leader may perform leader actions. +/// See `Cluster.MembershipGossip.converged` for more details. +final class MembershipGossipLogic: GossipLogic, CustomStringConvertible { + typealias Gossip = Cluster.MembershipGossip + typealias Acknowledgement = Cluster.MembershipGossip + + private let context: Context + internal lazy var localNode: UniqueNode = self.context.system.cluster.node + + internal var latestGossip: Cluster.MembershipGossip + private let notifyOnGossipRef: ActorRef + + /// We store and use a shuffled yet stable order for gossiping peers. + /// See `updateActivePeers` for details. + private var peers: [AddressableActorRef] = [] + /// Constantly mutated by `nextPeerToGossipWith` in an effort to keep order in which we gossip with nodes evenly distributed. + /// This follows our logic in SWIM, and has the benefit that we never get too chatty with one specific node (as in the worst case it may be unreachable or down already). + private var _peerToGossipWithIndex: Int = 0 + + /// During 1:1 gossip interactions, update this table, which means "we definitely know the specific node has seen our version VV at ..." + /// + /// See `updateActivePeers` and `receiveGossip` for details. + // TODO: This can be optimized and it's enough if we keep a digest of the gossips; this way ACKs can just send the digest as well saving space. + private var lastGossipFrom: [AddressableActorRef: Cluster.MembershipGossip] = [:] + + init(_ context: Context, notifyOnGossipRef: ActorRef) { + self.context = context + self.notifyOnGossipRef = notifyOnGossipRef + self.latestGossip = .init(ownerNode: context.system.cluster.node) + } + + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: Spreading gossip + + func selectPeers(_ peers: [AddressableActorRef]) -> [AddressableActorRef] { + // how many peers we select in each gossip round, + // we could for example be dynamic and notice if we have 10+ nodes, we pick 2 members to speed up the dissemination etc. + let n = 1 + + self.updateActivePeers(peers) + var selectedPeers: [AddressableActorRef] = [] + selectedPeers.reserveCapacity(min(n, self.peers.count)) + + /// Trust the order of peers in gossipPeers for the selection; see `updateActivePeers` for logic of the ordering. + for peer in self.peers where selectedPeers.count < n { + if self.shouldGossipWith(peer) { + selectedPeers.append(peer) + } + } + + return selectedPeers + } + + private func updateActivePeers(_ peers: [AddressableActorRef]) { + if let changed = Self.peersChanged(known: self.peers, current: peers) { + // 1) remove any peers which are no longer active + // - from the peers list + // - from their gossip storage, we'll never gossip with them again after all + if !changed.removed.isEmpty { + let removedPeers = Set(changed.removed) + self.peers = self.peers.filter { !removedPeers.contains($0) } + changed.removed.forEach { removedPeer in + _ = self.lastGossipFrom.removeValue(forKey: removedPeer) + } + } + + for peer in changed.added { + // Newly added members are inserted at a random spot in the list of members + // to ping, to have a better distribution of messages to this node from all + // other nodes. If for example all nodes would add it to the end of the list, + // it would take a longer time until it would be pinged for the first time + // and also likely receive multiple pings within a very short time frame. + // + // This is adopted from the SWIM membership implementation and related papers. + let insertIndex = Int.random(in: self.peers.startIndex ... self.peers.endIndex) + self.peers.insert(peer, at: insertIndex) + } + } + } + + func makePayload(target: AddressableActorRef) -> Cluster.MembershipGossip? { + // today we don't trim payloads at all + // TODO: trim some information? + self.latestGossip + } + + /// True if the peers is "behind" in terms of information it has "seen" (as determined by comparing our and its seen tables). + // TODO: Implement stricter-round robin, the same way as our SWIM impl does, see `nextMemberToPing` + // This hardens the implementation against gossiping with the same node multiple times in a row. + // Note that we do NOT need to worry about filtering out dead peers as this is automatically handled by the gossip shell. + private func shouldGossipWith(_ peer: AddressableActorRef) -> Bool { + guard peer.address.node != nil else { + // targets should always be remote peers; one not having a node should not happen, let's ignore it as a gossip target + return false + } + + guard let lastSeenGossipFromPeer = self.lastGossipFrom[peer] else { + // it's a peer we have not gotten any gossip from yet + return true + } + + return self.latestGossip.seen != lastSeenGossipFromPeer.seen + } + + // TODO: may also want to return "these were removed" if we need to make any internal cleanup + static func peersChanged(known: [AddressableActorRef], current: [AddressableActorRef]) -> PeersChanged? { + // TODO: a bit lazy implementation + let knownSet = Set(known) + let currentSet = Set(current) + + let added = currentSet.subtracting(knownSet) + let removed = knownSet.subtracting(currentSet) + + if added.isEmpty, removed.isEmpty { + return nil + } else { + return PeersChanged( + added: added, + removed: removed + ) + } + } + + struct PeersChanged { + let added: Set + let removed: Set + + init(added: Set, removed: Set) { + assert(!added.isEmpty || !removed.isEmpty, "PeersChanged yet both added/removed are empty!") + self.added = added + self.removed = removed + } + } + + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: Receiving gossip + + func receiveGossip(_ gossip: Gossip, from peer: AddressableActorRef) -> Acknowledgement? { + // 1) mark that from that specific peer, we know it observed at least that version + self.lastGossipFrom[peer] = gossip + + // 2) move forward the gossip we store + self.mergeInbound(gossip: gossip) + + // 3) notify listeners + self.notifyOnGossipRef.tell(self.latestGossip) + + // FIXME: optimize ack reply; this can contain only rows of seen tables where we are "ahead" (and always "our" row) + // no need to send back the entire tables if it's the same up to date ones as we just received + return self.latestGossip + } + + func receiveAcknowledgement(_ acknowledgement: Acknowledgement, from peer: AddressableActorRef, confirming gossip: Cluster.MembershipGossip) { + // 1) store the direct gossip we got from this peer; we can use this to know if there's no need to gossip to that peer by inspecting seen table equality + self.lastGossipFrom[peer] = acknowledgement + + // 2) move forward the gossip we store + self.mergeInbound(gossip: acknowledgement) + + // 3) notify listeners + self.notifyOnGossipRef.tell(self.latestGossip) + } + + func receiveLocalGossipUpdate(_ gossip: Cluster.MembershipGossip) { + self.mergeInbound(gossip: gossip) + } + + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: Utilities + + private func mergeInbound(gossip: Cluster.MembershipGossip) { + _ = self.latestGossip.mergeForward(incoming: gossip) + // effects are signalled via the ClusterShell, not here (it will also perform a merge) // TODO: a bit duplicated, could we maintain it here? + } + + var description: String { + "MembershipGossipLogic(\(self.localNode))" + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Membership Gossip Logic Control + +let MembershipGossipIdentifier: StringGossipIdentifier = "membership" + +extension GossiperControl where Gossip == Cluster.MembershipGossip { + func update(payload: Gossip) { + self.update(MembershipGossipIdentifier, payload: payload) + } + + func remove() { + self.remove(MembershipGossipIdentifier) + } + + func sideChannelTell(message: Any) { + self.sideChannelTell(MembershipGossipIdentifier, message: message) + } +} diff --git a/Sources/DistributedActors/Gossip/Gossip+Serialization.swift b/Sources/DistributedActors/Gossip/Gossip+Serialization.swift index 912144a3b..d3f08616a 100644 --- a/Sources/DistributedActors/Gossip/Gossip+Serialization.swift +++ b/Sources/DistributedActors/Gossip/Gossip+Serialization.swift @@ -37,7 +37,6 @@ extension GossipShell.Message: Codable { let container = try decoder.container(keyedBy: CodingKeys.self) switch try container.decode(DiscriminatorKeys.self, forKey: ._case) { case .gossip: - let identifierManifest = try container.decode(Serialization.Manifest.self, forKey: .gossip_identifier_manifest) let identifierPayload = try container.decode(Data.self, forKey: .gossip_identifier) let identifierAny = try context.serialization.deserializeAny(from: .data(identifierPayload), using: identifierManifest) @@ -51,10 +50,10 @@ extension GossipShell.Message: Codable { // FIXME: sometimes we could encode raw and not via the Data -- think about it and fix it let payloadManifest = try container.decode(Serialization.Manifest.self, forKey: .gossip_payload_manifest) let payloadPayload = try container.decode(Data.self, forKey: .gossip_payload) - let payload = try context.serialization.deserialize(as: Envelope.self, from: .data(payloadPayload), using: payloadManifest) + let payload = try context.serialization.deserialize(as: Gossip.self, from: .data(payloadPayload), using: payloadManifest) - let ackRefAddress = try container.decode(ActorAddress.self, forKey: .ackRef) - let ackRef = context.resolveActorRef(GossipACK.self, identifiedBy: ackRefAddress) + let ackRefAddress = try container.decodeIfPresent(ActorAddress.self, forKey: .ackRef) + let ackRef = ackRefAddress.map { context.resolveActorRef(Acknowledgement.self, identifiedBy: $0) } self = .gossip(identity: identifier, origin: origin, payload, ackRef: ackRef) } @@ -81,7 +80,7 @@ extension GossipShell.Message: Codable { try container.encode(serializedPayload.manifest, forKey: .gossip_payload_manifest) try container.encode(serializedPayload.buffer.readData(), forKey: .gossip_payload) - try container.encode(ackRef.address, forKey: .ackRef) + try container.encodeIfPresent(ackRef?.address, forKey: .ackRef) default: throw SerializationError.unableToSerialize(hint: "\(reflecting: Self.self)") diff --git a/Sources/DistributedActors/Gossip/Gossip+Settings.swift b/Sources/DistributedActors/Gossip/Gossip+Settings.swift index 5c54939c6..823b46964 100644 --- a/Sources/DistributedActors/Gossip/Gossip+Settings.swift +++ b/Sources/DistributedActors/Gossip/Gossip+Settings.swift @@ -18,27 +18,42 @@ extension Gossiper { public struct Settings { /// Interval at which gossip rounds should proceed. - public var gossipInterval: TimeAmount = .seconds(2) + /// + /// - SeeAlso: `intervalRandomFactor` + public var interval: TimeAmount /// Adds a random factor to the gossip interval, which is useful to avoid an entire cluster ticking "synchronously" /// at the same time, causing spikes in gossip traffic (as all nodes decide to gossip in the same second). /// /// - example: A random factor of `0.5` results in backoffs between 50% below and 50% above the base interval. /// - warning: MUST be between: `<0; 1>` (inclusive) - public var gossipIntervalRandomFactor: Double = 0.2 { + public var intervalRandomFactor: Double = 0.2 { willSet { precondition(newValue >= 0, "settings.crdt.gossipIntervalRandomFactor MUST BE >= 0, was: \(newValue)") precondition(newValue <= 1, "settings.crdt.gossipIntervalRandomFactor MUST BE <= 1, was: \(newValue)") } } - public var effectiveGossipInterval: TimeAmount { - let baseInterval = self.gossipInterval - let randomizeMultiplier = Double.random(in: (1 - self.gossipIntervalRandomFactor) ... (1 + self.gossipIntervalRandomFactor)) + public var effectiveInterval: TimeAmount { + let baseInterval = self.interval + let randomizeMultiplier = Double.random(in: (1 - self.intervalRandomFactor) ... (1 + self.intervalRandomFactor)) let randomizedInterval = baseInterval * randomizeMultiplier return randomizedInterval } + /// Hints the Gossiper at weather or not acknowledgments are expected or not. + /// + /// If a gossiper which does not expect acknowledgements would be about to send an ack, a warning will be logged. + public var style: GossipSpreadingStyle + public enum GossipSpreadingStyle { + /// Gossip does NOT require acknowledgements and messages will be spread using uni-directional `tell` message sends. + case unidirectional + + /// Gossip DOES expect acknowledgements for spread messages, and messages will be spread using `ask` message sends. + case acknowledged(timeout: TimeAmount) + } + + /// How the gossiper should discover peers to gossip with. public var peerDiscovery: PeerDiscovery = .manuallyIntroduced public enum PeerDiscovery { /// Peers have to be manually introduced by calling `control.introduce()` on to the gossiper. @@ -47,7 +62,7 @@ extension Gossiper { case manuallyIntroduced /// Automatically register this gossiper and subscribe for any others identifying under the same - /// `Receptionist.RegistrationKey.Message>(id)`. + /// `Receptionist.RegistrationKey.Message>(id)`. case fromReceptionistListing(id: String) /// Automatically discover and add cluster members to the gossip group when they become reachable in `atLeast` status. diff --git a/Sources/DistributedActors/Gossip/Gossip+Logic.swift b/Sources/DistributedActors/Gossip/GossipLogic.swift similarity index 53% rename from Sources/DistributedActors/Gossip/Gossip+Logic.swift rename to Sources/DistributedActors/Gossip/GossipLogic.swift index 0acc97085..6c2b507a2 100644 --- a/Sources/DistributedActors/Gossip/Gossip+Logic.swift +++ b/Sources/DistributedActors/Gossip/GossipLogic.swift @@ -38,10 +38,10 @@ import Logging /// for a nice overview of the general concepts involved in gossip algorithms. /// - SeeAlso: `Cluster.Gossip` for the Actor System's own gossip mechanism for membership dissemination public protocol GossipLogic { - associatedtype Envelope: GossipEnvelopeProtocol - typealias Context = GossipLogicContext + associatedtype Gossip: Codable + associatedtype Acknowledgement: Codable - // init(context: Context) // TODO: a form of context? + typealias Context = GossipLogicContext // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Spreading gossip @@ -50,11 +50,26 @@ public protocol GossipLogic { /// /// Useful to implement using `PeerSelection` // TODO: OrderedSet would be the right thing here to be honest... - mutating func selectPeers(peers: [AddressableActorRef]) -> [AddressableActorRef] + mutating func selectPeers(_ peers: [AddressableActorRef]) -> [AddressableActorRef] // TODO: make a directive here - /// Allows for customizing the payload for specific targets - mutating func makePayload(target: AddressableActorRef) -> Envelope? + /// Allows for customizing the payload for each of the selected peers. + /// + /// Some gossip protocols are able to specialize the gossip payload sent to a specific peer, + /// e.g. by excluding information the peer is already aware of or similar. + /// + /// Returning `nil` means that the peer will be skipped in this gossip round, even though it was a candidate selected by peer selection. + mutating func makePayload(target: AddressableActorRef) -> Gossip? + + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: Receiving gossip + + /// Invoked whenever a gossip message is received from another peer. + /// + /// Note that a single gossiper instance may create _multiple_ `GossipLogic` instances, + /// one for each `GossipIdentifier` it is managing. This function is guaranteed to be invoked with the + /// gossip targeted to the same gossip identity as the logic's context + mutating func receiveGossip(_ gossip: Gossip, from peer: AddressableActorRef) -> Acknowledgement? /// Invoked when the specific gossiped payload is acknowledged by the target. /// @@ -62,36 +77,42 @@ public protocol GossipLogic { /// Eg. if gossip is sent to 2 peers, it is NOT deterministic which of the acks returns first (or at all!). /// /// - Parameters: - /// - target: The target which has acknowlaged the gossiped payload. + /// - acknowledgement: acknowledgement sent by the peer + /// - peer: The target which has acknowledged the gossiped payload. /// It corresponds to the parameter that was passed to the `makePayload(target:)` which created this gossip payload. - /// - envelope: - mutating func receivePayloadACK(target: AddressableActorRef, confirmedDeliveryOf envelope: Envelope) + /// - gossip: + mutating func receiveAcknowledgement(_ acknowledgement: Acknowledgement, from peer: AddressableActorRef, confirming gossip: Gossip) - // ==== ------------------------------------------------------------------------------------------------------------ - // MARK: Receiving gossip - - mutating func receiveGossip(origin: AddressableActorRef, payload: Envelope) + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Local interactions / control messages - mutating func localGossipUpdate(payload: Envelope) + mutating func receiveLocalGossipUpdate(_ gossip: Gossip) /// Extra side channel, allowing for arbitrary outside interactions with this gossip logic. - // TODO: We could consider making it typed perhaps... - mutating func receiveSideChannelMessage(message: Any) throws + mutating func receiveSideChannelMessage(_ message: Any) throws } extension GossipLogic { - public mutating func receiveSideChannelMessage(message: Any) throws { + public mutating func receiveAcknowledgement(_ acknowledgement: Acknowledgement, from peer: AddressableActorRef, confirming gossip: Gossip) { + // ignore by default + } + + public mutating func receiveSideChannelMessage(_ message: Any) throws { // ignore by default } } -public struct GossipLogicContext { +public struct GossipLogicContext { + /// Identifier associated with this gossip logic. + /// + /// Many gossipers only use a single identifier (and logic), + /// however some may need to manage gossip rounds for specific identifiers independently. public let gossipIdentifier: GossipIdentifier - private let ownerContext: ActorContext.Message> + private let gossiperContext: ActorContext.Message> - internal init(ownerContext: ActorContext.Message>, gossipIdentifier: GossipIdentifier) { - self.ownerContext = ownerContext + internal init(ownerContext: ActorContext.Message>, gossipIdentifier: GossipIdentifier) { + self.gossiperContext = ownerContext self.gossipIdentifier = gossipIdentifier } @@ -100,66 +121,73 @@ public struct GossipLogicContext { /// Should not be used to arbitrarily allow sending messages to the gossiper from gossip logics, /// which is why it is only an address and not full ActorRef to the gossiper. public var gossiperAddress: ActorAddress { - self.ownerContext.myself.address + self.gossiperContext.myself.address } + /// Logger associated with the owning `Gossiper`. + /// + /// Has the `gossip/identifier` of this gossip logic stored as metadata automatically. public var log: Logger { - var l = self.ownerContext.log + var l = self.gossiperContext.log l[metadataKey: "gossip/identifier"] = "\(self.gossipIdentifier)" return l } public var system: ActorSystem { - self.ownerContext.system + self.gossiperContext.system } } -public struct AnyGossipLogic: GossipLogic, CustomStringConvertible { +public struct AnyGossipLogic: GossipLogic, CustomStringConvertible { @usableFromInline let _selectPeers: ([AddressableActorRef]) -> [AddressableActorRef] @usableFromInline - let _makePayload: (AddressableActorRef) -> Envelope? - @usableFromInline - let _receivePayloadACK: (AddressableActorRef, Envelope) -> Void + let _makePayload: (AddressableActorRef) -> Gossip? @usableFromInline - let _receiveGossip: (AddressableActorRef, Envelope) -> Void + let _receiveGossip: (Gossip, AddressableActorRef) -> Acknowledgement? @usableFromInline - let _localGossipUpdate: (Envelope) -> Void + let _receiveAcknowledgement: (Acknowledgement, AddressableActorRef, Gossip) -> Void + @usableFromInline + let _receiveLocalGossipUpdate: (Gossip) -> Void @usableFromInline let _receiveSideChannelMessage: (Any) throws -> Void + public init(context: Context) { + fatalError("\(Self.self) is intended to be created with a context, use `init(logic)` instead.") + } + public init(_ logic: Logic) - where Logic: GossipLogic, Logic.Envelope == Envelope { + where Logic: GossipLogic, Logic.Gossip == Gossip, Logic.Acknowledgement == Acknowledgement { var l = logic - self._selectPeers = { l.selectPeers(peers: $0) } + self._selectPeers = { l.selectPeers($0) } self._makePayload = { l.makePayload(target: $0) } - self._receivePayloadACK = { l.receivePayloadACK(target: $0, confirmedDeliveryOf: $1) } + self._receiveGossip = { l.receiveGossip($0, from: $1) } - self._receiveGossip = { l.receiveGossip(origin: $0, payload: $1) } - self._localGossipUpdate = { l.localGossipUpdate(payload: $0) } + self._receiveAcknowledgement = { l.receiveAcknowledgement($0, from: $1, confirming: $2) } + self._receiveLocalGossipUpdate = { l.receiveLocalGossipUpdate($0) } - self._receiveSideChannelMessage = { try l.receiveSideChannelMessage(message: $0) } + self._receiveSideChannelMessage = { try l.receiveSideChannelMessage($0) } } - public func selectPeers(peers: [AddressableActorRef]) -> [AddressableActorRef] { + public func selectPeers(_ peers: [AddressableActorRef]) -> [AddressableActorRef] { self._selectPeers(peers) } - public func makePayload(target: AddressableActorRef) -> Envelope? { + public func makePayload(target: AddressableActorRef) -> Gossip? { self._makePayload(target) } - public func receivePayloadACK(target: AddressableActorRef, confirmedDeliveryOf envelope: Envelope) { - self._receivePayloadACK(target, envelope) + public func receiveGossip(_ gossip: Gossip, from peer: AddressableActorRef) -> Acknowledgement? { + self._receiveGossip(gossip, peer) } - public func receiveGossip(origin: AddressableActorRef, payload: Envelope) { - self._receiveGossip(origin, payload) + public func receiveAcknowledgement(_ acknowledgement: Acknowledgement, from peer: AddressableActorRef, confirming gossip: Gossip) { + self._receiveAcknowledgement(acknowledgement, peer, gossip) } - public func localGossipUpdate(payload: Envelope) { - self._localGossipUpdate(payload) + public func receiveLocalGossipUpdate(_ gossip: Gossip) { + self._receiveLocalGossipUpdate(gossip) } public func receiveSideChannelMessage(_ message: Any) throws { @@ -167,21 +195,6 @@ public struct AnyGossipLogic: GossipLogic, Cus } public var description: String { - "GossipLogicBox<\(reflecting: Envelope.self)>(...)" + "\(reflecting: Self.self)(...)" } } - -// ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Envelope - -public protocol GossipEnvelopeProtocol: Codable { - associatedtype Metadata - associatedtype Payload - - // Payload MAY contain the metadata, and we just expose it, or metadata is separate and we do NOT gossip it. - - var metadata: Metadata { get } - var payload: Payload { get } -} - -public struct GossipACK: Codable {} // TODO: Make Acknowlagement an associated type on GossipEnvelopeProtocol! diff --git a/Sources/DistributedActors/Gossip/Gossip+Shell+Coding.swift b/Sources/DistributedActors/Gossip/Gossiper+Shell+Serialization.swift similarity index 100% rename from Sources/DistributedActors/Gossip/Gossip+Shell+Coding.swift rename to Sources/DistributedActors/Gossip/Gossiper+Shell+Serialization.swift diff --git a/Sources/DistributedActors/Gossip/Gossip+Shell.swift b/Sources/DistributedActors/Gossip/Gossiper+Shell.swift similarity index 64% rename from Sources/DistributedActors/Gossip/Gossip+Shell.swift rename to Sources/DistributedActors/Gossip/Gossiper+Shell.swift index 68525dbb1..a0a7fc140 100644 --- a/Sources/DistributedActors/Gossip/Gossip+Shell.swift +++ b/Sources/DistributedActors/Gossip/Gossiper+Shell.swift @@ -16,24 +16,26 @@ import Logging private let gossipTickKey: TimerKey = "gossip-tick" -/// Convergent gossip is a gossip mechanism which aims to equalize some state across all peers participating. -internal final class GossipShell { +/// :nodoc: +/// +/// Not intended to be spawned directly, use `Gossiper.spawn` instead! +internal final class GossipShell { typealias Ref = ActorRef let settings: Gossiper.Settings - private let makeLogic: (ActorContext, GossipIdentifier) -> AnyGossipLogic + private let makeLogic: (ActorContext, GossipIdentifier) -> AnyGossipLogic /// Payloads to be gossiped on gossip rounds - private var gossipLogics: [AnyGossipIdentifier: AnyGossipLogic] + private var gossipLogics: [AnyGossipIdentifier: AnyGossipLogic] typealias PeerRef = ActorRef private var peers: Set - fileprivate init( + internal init( settings: Gossiper.Settings, makeLogic: @escaping (Logic.Context) -> Logic - ) where Logic: GossipLogic, Logic.Envelope == Envelope { + ) where Logic: GossipLogic, Logic.Gossip == Gossip, Logic.Acknowledgement == Acknowledgement { self.settings = settings self.makeLogic = { shellContext, id in let logicContext = GossipLogicContext(ownerContext: shellContext, gossipIdentifier: id) @@ -90,8 +92,8 @@ internal final class GossipShell { _ context: ActorContext, identifier: GossipIdentifier, origin: ActorRef, - payload: Envelope, - ackRef: ActorRef + payload: Gossip, + ackRef: ActorRef? ) { context.log.trace("Received gossip [\(identifier.gossipIdentifier)]", metadata: [ "gossip/identity": "\(identifier.gossipIdentifier)", @@ -99,34 +101,60 @@ internal final class GossipShell { "gossip/incoming": Logger.MetadataValue.pretty(payload), ]) - // TODO: we could handle some actions if it issued some - let logic: AnyGossipLogic = self.getEnsureLogic(context, identifier: identifier) + let logic = self.getEnsureLogic(context, identifier: identifier) + + let ack: Acknowledgement? = logic.receiveGossip(payload, from: origin.asAddressable()) - // TODO: we could handle directives from the logic - logic.receiveGossip(origin: origin.asAddressable(), payload: payload) + switch self.settings.style { + case .acknowledged: + if let ack = ack { + ackRef?.tell(ack) + } - ackRef.tell(.init()) // TODO: allow the user to return an ACK from receiveGossip + case .unidirectional: + if let unexpectedAck = ack { + context.log.warning( + """ + GossipLogic attempted to offer Acknowledgement while it is configured as .unidirectional!\ + This is potentially a bug in the logic or the Gossiper's configuration. Dropping acknowledgement. + """, metadata: [ + "gossip/identity": "\(identifier.gossipIdentifier)", + "gossip/origin": "\(origin.address)", + "gossip/ack": "\(unexpectedAck)", + ] + ) + } + if let unexpectedAckRef = ackRef { + context.log.warning( + """ + Incoming gossip has acknowledgement actor ref and seems to be expecting an ACK, while this gossiper is configured as .unidirectional! \ + This is potentially a bug in the logic or the Gossiper's configuration. + """, metadata: [ + "gossip/identity": "\(identifier.gossipIdentifier)", + "gossip/origin": "\(origin.address)", + "gossip/ackRef": "\(unexpectedAckRef)", + ] + ) + } + } } private func onLocalPayloadUpdate( _ context: ActorContext, identifier: GossipIdentifier, - payload: Envelope + payload: Gossip ) { let logic = self.getEnsureLogic(context, identifier: identifier) - logic.localGossipUpdate(payload: payload) - - context.log.trace("Gossip payload [\(identifier.gossipIdentifier)] (locally) updated", metadata: [ + context.log.trace("Update (locally) gossip payload [\(identifier.gossipIdentifier)]", metadata: [ "gossip/identifier": "\(identifier.gossipIdentifier)", "gossip/payload": "\(pretty: payload)", ]) - - // TODO: bump local version vector; once it is in the envelope + logic.receiveLocalGossipUpdate(payload) } - private func getEnsureLogic(_ context: ActorContext, identifier: GossipIdentifier) -> AnyGossipLogic { - let logic: AnyGossipLogic + private func getEnsureLogic(_ context: ActorContext, identifier: GossipIdentifier) -> AnyGossipLogic { + let logic: AnyGossipLogic if let existing = self.gossipLogics[identifier.asAnyGossipIdentifier] { logic = existing } else { @@ -161,7 +189,7 @@ internal final class GossipShell { } for (identifier, logic) in self.gossipLogics { - let selectedPeers = logic.selectPeers(peers: allPeers) // TODO: OrderedSet would be the right thing here... + let selectedPeers = logic.selectPeers(allPeers) // TODO: OrderedSet would be the right thing here... context.log.trace("New gossip round, selected [\(selectedPeers.count)] peers, from [\(allPeers.count)] peers", metadata: [ "gossip/id": "\(identifier.gossipIdentifier)", @@ -169,7 +197,7 @@ internal final class GossipShell { ]) for selectedPeer in selectedPeers { - guard let payload: Envelope = logic.makePayload(target: selectedPeer) else { + guard let gossip: Gossip = logic.makePayload(target: selectedPeer) else { context.log.trace("Skipping gossip to peer \(selectedPeer)", metadata: [ "gossip/id": "\(identifier.gossipIdentifier)", "gossip/target": "\(selectedPeer)", @@ -187,15 +215,8 @@ internal final class GossipShell { continue } -// pprint(""" -// [\(context.system.cluster.node)] \ -// Selected [\(selectedPeers.count)] peers, \ -// from [\(allPeers.count)] peers: \(selectedPeers)\ -// PAYLOAD: \(pretty: payload) -// """) - - self.sendGossip(context, identifier: identifier, payload, to: selectedRef, onAck: { - logic.receivePayloadACK(target: selectedPeer, confirmedDeliveryOf: payload) + self.sendGossip(context, identifier: identifier, gossip, to: selectedRef, onGossipAck: { ack in + logic.receiveAcknowledgement(ack, from: selectedPeer, confirming: gossip) }) } @@ -207,9 +228,9 @@ internal final class GossipShell { private func sendGossip( _ context: ActorContext, identifier: AnyGossipIdentifier, - _ payload: Envelope, + _ payload: Gossip, to target: PeerRef, - onAck: @escaping () -> Void + onGossipAck: @escaping (Acknowledgement) -> Void ) { context.log.trace("Sending gossip to \(target.address)", metadata: [ "gossip/target": "\(target.address)", @@ -217,21 +238,26 @@ internal final class GossipShell { "actor/message": Logger.MetadataValue.pretty(payload), ]) - let ack = target.ask(for: GossipACK.self, timeout: .seconds(3)) { replyTo in - Message.gossip(identity: identifier.underlying, origin: context.myself, payload, ackRef: replyTo) - } + switch self.settings.style { + case .unidirectional: + target.tell(Message.gossip(identity: identifier.underlying, origin: context.myself, payload, ackRef: nil)) + case .acknowledged(let timeout): + let ack = target.ask(for: Acknowledgement.self, timeout: timeout) { replyTo in + Message.gossip(identity: identifier.underlying, origin: context.myself, payload, ackRef: replyTo) + } - context.onResultAsync(of: ack, timeout: .effectivelyInfinite) { res in - switch res { - case .success(let ack): - context.log.trace("Gossip ACKed", metadata: [ - "gossip/ack": "\(ack)", - ]) - onAck() - case .failure: - context.log.warning("Failed to ACK delivery [\(identifier.gossipIdentifier)] gossip \(payload) to \(target)") + context.onResultAsync(of: ack, timeout: .effectivelyInfinite) { res in + switch res { + case .success(let ack): + context.log.trace("Gossip ACKed", metadata: [ + "gossip/ack": "\(ack)", + ]) + onGossipAck(ack) + case .failure: + context.log.warning("Failed to ACK delivery [\(identifier.gossipIdentifier)] gossip \(payload) to \(target)") + } + return .same } - return .same } } @@ -240,8 +266,8 @@ internal final class GossipShell { return // no need to schedule gossip ticks if we have no peers } - let delay = self.settings.effectiveGossipInterval - context.log.trace("Schedule next gossip round in \(delay.prettyDescription) (\(self.settings.gossipInterval.prettyDescription) ± \(self.settings.gossipIntervalRandomFactor * 100)%)") + let delay = self.settings.effectiveInterval + context.log.trace("Schedule next gossip round in \(delay.prettyDescription) (\(self.settings.interval.prettyDescription) ± \(self.settings.intervalRandomFactor * 100)%)") context.timers.startSingle(key: gossipTickKey, message: ._periodicGossipTick, delay: delay) } } @@ -271,6 +297,11 @@ extension GossipShell { let resolved: AddressableActorRef = resolvePeerOn(member) if let peer = resolved.ref as? PeerRef { + // We MUST always watch all peers we gossip with, as if they (or their nodes) were to terminate + // they MUST be removed from the peer list we offer to gossip logics. Otherwise a naive gossip logic + // may continue trying to gossip with that peer. + context.watch(peer) + if self.peers.insert(peer).inserted { context.log.debug("Automatically discovered peer", metadata: [ "gossip/peer": "\(peer)", @@ -361,10 +392,10 @@ extension GossipShell { extension GossipShell { enum Message { // gossip - case gossip(identity: GossipIdentifier, origin: ActorRef, Envelope, ackRef: ActorRef) + case gossip(identity: GossipIdentifier, origin: ActorRef, Gossip, ackRef: ActorRef?) // local messages - case updatePayload(identifier: GossipIdentifier, Envelope) + case updatePayload(identifier: GossipIdentifier, Gossip) case removePayload(identifier: GossipIdentifier) case introducePeer(PeerRef) @@ -374,124 +405,3 @@ extension GossipShell { case _periodicGossipTick } } - -// ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Gossiper - -/// A Gossiper -public enum Gossiper { - /// Spawns a gossip actor, that will periodically gossip with its peers about the provided payload. - static func start( - _ context: ActorRefFactory, name naming: ActorNaming, - of type: Envelope.Type = Envelope.self, - props: Props = .init(), - settings: Settings = .init(), - makeLogic: @escaping (Logic.Context) -> Logic - ) throws -> GossipControl - where Logic: GossipLogic, Logic.Envelope == Envelope { - let ref = try context.spawn( - naming, - of: GossipShell.Message.self, - props: props, - file: #file, line: #line, - GossipShell(settings: settings, makeLogic: makeLogic).behavior - ) - return GossipControl(ref) - } -} - -// ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: GossipControl - -internal struct GossipControl { - private let ref: GossipShell.Ref - - init(_ ref: GossipShell.Ref) { - self.ref = ref - } - - /// Introduce a peer to the gossip group - func introduce(peer: GossipShell.Ref) { - self.ref.tell(.introducePeer(peer)) - } - - // FIXME: is there some way to express that actually, Metadata is INSIDE Payload so I only want to pass the "envelope" myself...? - func update(_ identifier: GossipIdentifier, payload: GossipEnvelope) { - self.ref.tell(.updatePayload(identifier: identifier, payload)) - } - - func remove(_ identifier: GossipIdentifier) { - self.ref.tell(.removePayload(identifier: identifier)) - } - - /// Side channel messages which may be piped into specific gossip logics. - func sideChannelTell(_ identifier: GossipIdentifier, message: Any) { - self.ref.tell(.sideChannelMessage(identifier: identifier, message)) - } -} - -// ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Gossip Identifier - -/// Used to identify which identity a payload is tied with. -/// E.g. it could be used to mark the CRDT instance the gossip is carrying, or which "entity" a gossip relates to. -// FIXME: just force GossipIdentifier to be codable, avoid this hacky dance? -public protocol GossipIdentifier { - var gossipIdentifier: String { get } - - init(_ gossipIdentifier: String) - - var asAnyGossipIdentifier: AnyGossipIdentifier { get } -} - -public struct AnyGossipIdentifier: Hashable, GossipIdentifier { - public let underlying: GossipIdentifier - - public init(_ id: String) { - self.underlying = StringGossipIdentifier(stringLiteral: id) - } - - public init(_ identifier: GossipIdentifier) { - if let any = identifier as? AnyGossipIdentifier { - self = any - } else { - self.underlying = identifier - } - } - - public var gossipIdentifier: String { - self.underlying.gossipIdentifier - } - - public var asAnyGossipIdentifier: AnyGossipIdentifier { - self - } - - public func hash(into hasher: inout Hasher) { - self.underlying.gossipIdentifier.hash(into: &hasher) - } - - public static func == (lhs: AnyGossipIdentifier, rhs: AnyGossipIdentifier) -> Bool { - lhs.underlying.gossipIdentifier == rhs.underlying.gossipIdentifier - } -} - -public struct StringGossipIdentifier: GossipIdentifier, Hashable, ExpressibleByStringLiteral, CustomStringConvertible { - public let gossipIdentifier: String - - public init(_ gossipIdentifier: StringLiteralType) { - self.gossipIdentifier = gossipIdentifier - } - - public init(stringLiteral gossipIdentifier: StringLiteralType) { - self.gossipIdentifier = gossipIdentifier - } - - public var asAnyGossipIdentifier: AnyGossipIdentifier { - AnyGossipIdentifier(self) - } - - public var description: String { - "StringGossipIdentifier(\(self.gossipIdentifier))" - } -} diff --git a/Sources/DistributedActors/Gossip/Gossiper.swift b/Sources/DistributedActors/Gossip/Gossiper.swift new file mode 100644 index 000000000..cabc68525 --- /dev/null +++ b/Sources/DistributedActors/Gossip/Gossiper.swift @@ -0,0 +1,150 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Gossiper + +/// A generalized Gossiper which can interpret a `GossipLogic` provided to it. +/// +/// It encapsulates multiple error prone details surrounding implementing gossip mechanisms, +/// such as peer monitoring and managing cluster events and their impact on peers. +/// +/// It can automatically discover new peers as new members join the cluster using the `Receptionist`. +/// +/// - SeeAlso: [Gossiping in Distributed Systems](https://www.distributed-systems.net/my-data/papers/2007.osr.pdf) (Anne-Marie Kermarrec, Maarten van Steen), +/// for a nice overview of the general concepts involved in gossip algorithms. +/// - SeeAlso: [Cassandra Internals — Understanding Gossip](https://www.youtube.com/watch?v=FuP1Fvrv6ZQ) which a nice generally useful talk +public enum Gossiper { + /// Spawns a gossip actor, that will periodically gossip with its peers about the provided payload. + static func spawn( + _ context: ActorRefFactory, + name naming: ActorNaming, + settings: Settings, + props: Props = .init(), + makeLogic: @escaping (Logic.Context) -> Logic + ) throws -> GossiperControl + where Logic: GossipLogic, Logic.Gossip == Envelope, Logic.Acknowledgement == Acknowledgement { + let ref = try context.spawn( + naming, + of: GossipShell.Message.self, + props: props, + file: #file, line: #line, + GossipShell(settings: settings, makeLogic: makeLogic).behavior + ) + return GossiperControl(ref) + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: GossiperControl + +/// Control object used to modify and interact with a spawned `Gossiper`. +public struct GossiperControl { + /// Internal FOR TESTING ONLY. + internal let ref: GossipShell.Ref + + init(_ ref: GossipShell.Ref) { + self.ref = ref + } + + /// Introduce a peer to the gossip group. + /// + /// This method is fairly manual and error prone and as such internal only for the time being. + /// Please use the receptionist based peer discovery instead. + internal func introduce(peer: GossipShell.Ref) { + self.ref.tell(.introducePeer(peer)) + } + + // FIXME: is there some way to express that actually, Metadata is INSIDE Payload so I only want to pass the "envelope" myself...? + public func update(_ identifier: GossipIdentifier, payload: Gossip) { + self.ref.tell(.updatePayload(identifier: identifier, payload)) + } + + public func remove(_ identifier: GossipIdentifier) { + self.ref.tell(.removePayload(identifier: identifier)) + } + + /// Side channel messages which may be piped into specific gossip logics. + public func sideChannelTell(_ identifier: GossipIdentifier, message: Any) { + self.ref.tell(.sideChannelMessage(identifier: identifier, message)) + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Gossip Identifier + +/// Used to identify which identity a payload is tied with. +/// E.g. it could be used to mark the CRDT instance the gossip is carrying, or which "entity" a gossip relates to. +// FIXME: just force GossipIdentifier to be codable, avoid this hacky dance? +public protocol GossipIdentifier { + var gossipIdentifier: String { get } + + init(_ gossipIdentifier: String) + + var asAnyGossipIdentifier: AnyGossipIdentifier { get } +} + +public struct AnyGossipIdentifier: Hashable, GossipIdentifier { + public let underlying: GossipIdentifier + + public init(_ id: String) { + self.underlying = StringGossipIdentifier(stringLiteral: id) + } + + public init(_ identifier: GossipIdentifier) { + if let any = identifier as? AnyGossipIdentifier { + self = any + } else { + self.underlying = identifier + } + } + + public var gossipIdentifier: String { + self.underlying.gossipIdentifier + } + + public var asAnyGossipIdentifier: AnyGossipIdentifier { + self + } + + public func hash(into hasher: inout Hasher) { + self.underlying.gossipIdentifier.hash(into: &hasher) + } + + public static func == (lhs: AnyGossipIdentifier, rhs: AnyGossipIdentifier) -> Bool { + lhs.underlying.gossipIdentifier == rhs.underlying.gossipIdentifier + } +} + +public struct StringGossipIdentifier: GossipIdentifier, Hashable, ExpressibleByStringLiteral, CustomStringConvertible { + public let gossipIdentifier: String + + public init(_ gossipIdentifier: StringLiteralType) { + self.gossipIdentifier = gossipIdentifier + } + + public init(stringLiteral gossipIdentifier: StringLiteralType) { + self.gossipIdentifier = gossipIdentifier + } + + public var asAnyGossipIdentifier: AnyGossipIdentifier { + AnyGossipIdentifier(self) + } + + public var description: String { + "StringGossipIdentifier(\(self.gossipIdentifier))" + } +} diff --git a/Sources/DistributedActors/Gossip/PeerSelection.swift b/Sources/DistributedActors/Gossip/PeerSelection.swift index f82621929..7c2d1b981 100644 --- a/Sources/DistributedActors/Gossip/PeerSelection.swift +++ b/Sources/DistributedActors/Gossip/PeerSelection.swift @@ -21,7 +21,7 @@ /// // TODO: implement SWIMs selection in terms of this public protocol PeerSelection { associatedtype Peer: Hashable - typealias Peers = Array.SubSequence + typealias Peers = [Peer] func onMembershipEvent(event: Cluster.Event) diff --git a/Sources/DistributedActors/Serialization/Serialization.swift b/Sources/DistributedActors/Serialization/Serialization.swift index 4cb3126e0..05e869bc4 100644 --- a/Sources/DistributedActors/Serialization/Serialization.swift +++ b/Sources/DistributedActors/Serialization/Serialization.swift @@ -115,8 +115,8 @@ public class Serialization { // cluster settings.register(ClusterShell.Message.self) settings.register(Cluster.Event.self) - settings.register(Cluster.Gossip.self) - settings.register(GossipShell.Message.self) + settings.register(Cluster.MembershipGossip.self) + settings.register(GossipShell.Message.self) settings.register(StringGossipIdentifier.self) // receptionist needs some special casing @@ -166,10 +166,9 @@ public class Serialization { settings.register(CRDT.GCounterDelta.self, serializerID: Serialization.ReservedID.CRDTGCounterDelta) // crdt gossip - settings.register(GossipACK.self) - settings.register(GossipShell.Message.self) // TODO: remove this, workaround since we ust strings rather than mangled names today + settings.register(CRDT.GossipAck.self) + settings.register(GossipShell.Message.self) settings.register(CRDT.Gossip.self) - settings.register(CRDT.Gossip.Metadata.self) // errors settings.register(ErrorEnvelope.self) // TODO: can be removed once https://github.com/apple/swift/pull/30318 lands diff --git a/Sources/DistributedActorsTestKit/ActorTestKit.swift b/Sources/DistributedActorsTestKit/ActorTestKit.swift index 5d88e729f..68cff3f90 100644 --- a/Sources/DistributedActorsTestKit/ActorTestKit.swift +++ b/Sources/DistributedActorsTestKit/ActorTestKit.swift @@ -237,7 +237,11 @@ public struct EventuallyError: Error, CustomStringConvertible, CustomDebugString } public var debugDescription: String { - "EventuallyError(callSite: \(self.callSite), timeAmount: \(self.timeAmount), polledTimes: \(self.polledTimes), lastError: \(optional: self.lastError))" + let error = self.callSite.error( + """ + Eventually block failed, after \(self.timeAmount) (polled \(self.polledTimes) times), last error: \(optional: self.lastError) + """) + return "\(error)" } } @@ -328,14 +332,14 @@ extension ActorTestKit { public extension ActorTestKit { /// Creates a _fake_ `ActorContext` which can be used to pass around to fulfil type argument requirements, /// however it DOES NOT have the ability to perform any of the typical actor context actions (such as spawning etc). - func makeFakeContext(forType: M.Type = M.self) -> ActorContext { + func makeFakeContext(of: M.Type = M.self) -> ActorContext { MockActorContext(self.system) } /// Creates a _fake_ `ActorContext` which can be used to pass around to fulfil type argument requirements, /// however it DOES NOT have the ability to perform any of the typical actor context actions (such as spawning etc). func makeFakeContext(for: Behavior) -> ActorContext { - self.makeFakeContext(forType: M.self) + self.makeFakeContext(of: M.self) } } diff --git a/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift b/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift index 14813bb5e..728256c21 100644 --- a/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift +++ b/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift @@ -60,16 +60,18 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase { /// Set up a new node intended to be clustered. open func setUpNode(_ name: String, _ modifySettings: ((inout ActorSystemSettings) -> Void)? = nil) -> ActorSystem { - var captureSettings = LogCapture.Settings() - self.configureLogCapture(settings: &captureSettings) - let capture = LogCapture(settings: captureSettings) - let node = ActorSystem(name) { settings in settings.cluster.enabled = true settings.cluster.node.port = self.nextPort() if self.captureLogs { + var captureSettings = LogCapture.Settings() + self.configureLogCapture(settings: &captureSettings) + let capture = LogCapture(settings: captureSettings) + settings.logging.logger = capture.logger(label: name) + + self._logCaptures.append(capture) } settings.cluster.autoLeaderElection = .lowestReachable(minNumberOfMembers: 2) @@ -85,9 +87,6 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase { self._nodes.append(node) self._testKits.append(.init(node)) - if self.captureLogs { - self._logCaptures.append(capture) - } return node } @@ -124,7 +123,8 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase { public func joinNodes( node: ActorSystem, with other: ActorSystem, - ensureWithin: TimeAmount? = nil, ensureMembers maybeExpectedStatus: Cluster.MemberStatus? = nil + ensureWithin: TimeAmount? = nil, ensureMembers maybeExpectedStatus: Cluster.MemberStatus? = nil, + file: StaticString = #file, line: UInt = #line ) throws { node.cluster.join(node: other.cluster.node.node) @@ -133,9 +133,9 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase { if let expectedStatus = maybeExpectedStatus { if let specificTimeout = ensureWithin { - try self.ensureNodes(expectedStatus, on: node, within: specificTimeout, nodes: other.cluster.node) + try self.ensureNodes(expectedStatus, on: node, within: specificTimeout, nodes: other.cluster.node, file: file, line: line) } else { - try self.ensureNodes(expectedStatus, on: node, nodes: other.cluster.node) + try self.ensureNodes(expectedStatus, on: node, nodes: other.cluster.node, file: file, line: line) } } } diff --git a/Sources/DistributedActorsTestKit/LogCapture.swift b/Sources/DistributedActorsTestKit/LogCapture.swift index 638b31fff..9282d00ea 100644 --- a/Sources/DistributedActorsTestKit/LogCapture.swift +++ b/Sources/DistributedActorsTestKit/LogCapture.swift @@ -22,7 +22,6 @@ import XCTest /// /// ### Warning /// This handler uses locks for each and every operation. -// TODO: the implementation is quite incomplete and does not allow inspecting metadata setting etc. public final class LogCapture { private var _logs: [CapturedLogMessage] = [] private let lock = DistributedActorsConcurrencyHelpers.Lock() @@ -84,6 +83,11 @@ extension LogCapture { public var excludeGrep: Set = [] public var grep: Set = [] + public var ignoredMetadata: Set = [ + "actor/node", + "actor/nodeName", + ] + public init() {} } } @@ -110,8 +114,9 @@ extension LogCapture { } metadata.removeValue(forKey: "label") - metadata.removeValue(forKey: "actor/node") - metadata.removeValue(forKey: "actor/nodeName") + self.settings.ignoredMetadata.forEach { ignoreKey in + metadata.removeValue(forKey: ignoreKey) + } if !metadata.isEmpty { metadataString = "\n// metadata:\n" for key in metadata.keys.sorted() { diff --git a/Sources/DistributedActorsTestKit/ShouldMatchers.swift b/Sources/DistributedActorsTestKit/ShouldMatchers.swift index bc070a714..83f5b5bcf 100644 --- a/Sources/DistributedActorsTestKit/ShouldMatchers.swift +++ b/Sources/DistributedActorsTestKit/ShouldMatchers.swift @@ -43,13 +43,17 @@ public struct TestMatchers { public extension TestMatchers where T: Equatable { func toEqual(_ expected: T) { - let error = self.callSite.notEqualError(got: self.it, expected: expected) - XCTAssertEqual(self.it, expected, "\(error)", file: self.callSite.file, line: self.callSite.line) + if self.it != expected { + let error = self.callSite.notEqualError(got: self.it, expected: expected) + XCTFail("\(error)", file: self.callSite.file, line: self.callSite.line) + } } func toNotEqual(_ unexpectedEqual: T) { - let error = self.callSite.equalError(got: self.it, unexpectedEqual: unexpectedEqual) - XCTAssertNotEqual(self.it, unexpectedEqual, "\(error)", file: self.callSite.file, line: self.callSite.line) + if self.it == unexpectedEqual { + let error = self.callSite.equalError(got: self.it, unexpectedEqual: unexpectedEqual) + XCTFail("\(error)", file: self.callSite.file, line: self.callSite.line) + } } func toBe(_ expected: Other.Type) { diff --git a/Tests/DistributedActorsTests/ActorSystem+Testing.swift b/Tests/DistributedActorsTests/ActorSystem+Testing.swift index 3f0857642..01499a5fe 100644 --- a/Tests/DistributedActorsTests/ActorSystem+Testing.swift +++ b/Tests/DistributedActorsTests/ActorSystem+Testing.swift @@ -34,13 +34,14 @@ extension ActorSystem { /// Internal utility to create "known remote ref" on known target system. /// Real applications should never do this, and instead rely on the `Receptionist` to discover references. func _resolveKnownRemote(_ ref: ActorRef, onRemoteSystem remote: ActorSystem) -> ActorRef { - guard self._cluster != nil else { - fatalError("system must be clustered to allow resolving a remote ref.") - } + self._resolveKnownRemote(ref, onRemoteNode: remote.cluster.node) + } + + func _resolveKnownRemote(_ ref: ActorRef, onRemoteNode remoteNode: UniqueNode) -> ActorRef { guard let shell = self._cluster else { - fatalError("system._cluster shell must be available, was the resolve invoked too early (before system startup completed)?") + fatalError("Actor System must have clustering enabled to allow resolving remote actors") } - let remoteAddress = ActorAddress(node: remote.settings.cluster.uniqueBindNode, path: ref.path, incarnation: ref.address.incarnation) + let remoteAddress = ActorAddress(node: remoteNode, path: ref.path, incarnation: ref.address.incarnation) return ActorRef(.remote(RemoteClusterActorPersonality(shell: shell, address: remoteAddress, system: self))) } } diff --git a/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift index 6dee70af3..55fdf3464 100644 --- a/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift @@ -158,10 +158,11 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase { alone.cluster.join(node: alone.cluster.node.node) // "self join", should simply be ignored let testKit = self.testKit(alone) - - sleep(1) try testKit.eventually(within: .seconds(3)) { - alone.cluster.membershipSnapshot.count.shouldEqual(1) + let snapshot: Cluster.Membership = alone.cluster.membershipSnapshot + if snapshot.count != 1 { + throw TestError("Expected membership to include self node, was: \(snapshot)") + } } } diff --git a/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsTests.swift b/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsTests.swift index 1e5e5bf31..78ca910cc 100644 --- a/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsTests.swift +++ b/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsTests.swift @@ -149,7 +149,7 @@ final class ClusterLeaderActionsTests: XCTestCase { } @discardableResult - private func gossip(from: ClusterShellState, to: inout ClusterShellState) -> Cluster.Gossip.MergeDirective { + private func gossip(from: ClusterShellState, to: inout ClusterShellState) -> Cluster.MembershipGossip.MergeDirective { to.latestGossip.mergeForward(incoming: from.latestGossip) } diff --git a/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift index d9f55fa77..71d1b7876 100644 --- a/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift @@ -279,7 +279,7 @@ final class DowningClusteredTests: ClusteredActorSystemsXCTestCase { for remainingNode in nodes { let probe = probes[remainingNode.cluster.node]! - let events = try probe.fishFor(Cluster.MembershipChange.self, within: .seconds(60), expectedDownMemberEventsFishing(on: remainingNode)) + let events = try probe.fishFor(Cluster.MembershipChange.self, within: .seconds(120), expectedDownMemberEventsFishing(on: remainingNode)) events.shouldContain(where: { change in change.toStatus.isDown && (change.fromStatus == .joining || change.fromStatus == .up) }) for expectedDownNode in nodesToDown { diff --git a/Tests/DistributedActorsTests/Cluster/GossipSeenTableTests.swift b/Tests/DistributedActorsTests/Cluster/GossipSeenTableTests.swift index 19b0b7a87..9560db733 100644 --- a/Tests/DistributedActorsTests/Cluster/GossipSeenTableTests.swift +++ b/Tests/DistributedActorsTests/Cluster/GossipSeenTableTests.swift @@ -19,7 +19,7 @@ import XCTest /// Tests of just the datatype final class GossipSeenTableTests: XCTestCase { - typealias SeenTable = Cluster.Gossip.SeenTable + typealias SeenTable = Cluster.MembershipGossip.SeenTable var nodeA: UniqueNode! var nodeB: UniqueNode! @@ -37,13 +37,13 @@ final class GossipSeenTableTests: XCTestCase { } func test_seenTable_compare_concurrent_eachOtherDontKnown() { - let table = Cluster.Gossip.SeenTable.parse( + let table = Cluster.MembershipGossip.SeenTable.parse( """ A: A@1 """, nodes: self.allNodes ) - let incoming = Cluster.Gossip.SeenTable.parse( + let incoming = Cluster.MembershipGossip.SeenTable.parse( """ B: B@1 """, nodes: self.allNodes @@ -61,7 +61,7 @@ final class GossipSeenTableTests: XCTestCase { // MARK: increments func test_incrementVersion() { - var table = Cluster.Gossip.SeenTable(myselfNode: self.nodeA, version: .init()) + var table = Cluster.MembershipGossip.SeenTable(myselfNode: self.nodeA, version: .init()) table.version(at: self.nodeA).shouldEqual(VersionVector.parse("", nodes: self.allNodes)) table.incrementVersion(owner: self.nodeA, at: self.nodeA) @@ -83,13 +83,13 @@ final class GossipSeenTableTests: XCTestCase { // MARK: merge func test_seenTable_merge_notYetSeenInformation() { - var table = Cluster.Gossip.SeenTable.parse( + var table = Cluster.MembershipGossip.SeenTable.parse( """ A: A:1 """, nodes: self.allNodes ) - let incoming = Cluster.Gossip.SeenTable.parse( + let incoming = Cluster.MembershipGossip.SeenTable.parse( """ B: B:2 """, nodes: self.allNodes @@ -106,12 +106,12 @@ final class GossipSeenTableTests: XCTestCase { func test_seenTable_merge_sameInformation() { // a situation in which the two nodes have converged, so their versions are .same - var table = Cluster.Gossip.SeenTable(myselfNode: self.nodeA, version: .init()) + var table = Cluster.MembershipGossip.SeenTable(myselfNode: self.nodeA, version: .init()) table.incrementVersion(owner: self.nodeA, at: self.nodeA) // A observed: A:1 table.incrementVersion(owner: self.nodeA, at: self.nodeB) // A observed: A:1 B:1 table.incrementVersion(owner: self.nodeA, at: self.nodeB) // A observed: A:1 B:2 - var incoming = Cluster.Gossip(ownerNode: self.nodeB) // B observed: + var incoming = Cluster.MembershipGossip(ownerNode: self.nodeB) // B observed: incoming.incrementOwnerVersion() // B observed: B:1 incoming.incrementOwnerVersion() // B observed: B:2 incoming.seen.incrementVersion(owner: self.nodeB, at: self.nodeA) // B observed: A:1 B:2 @@ -125,10 +125,10 @@ final class GossipSeenTableTests: XCTestCase { func test_seenTable_merge_aheadInformation() { // the incoming gossip is "ahead" and has some more information - var table = Cluster.Gossip.SeenTable(myselfNode: self.nodeA, version: .init()) + var table = Cluster.MembershipGossip.SeenTable(myselfNode: self.nodeA, version: .init()) table.incrementVersion(owner: self.nodeA, at: self.nodeA) // A observed: A:1 - var incoming = Cluster.Gossip(ownerNode: self.nodeB) // B observed: + var incoming = Cluster.MembershipGossip(ownerNode: self.nodeB) // B observed: incoming.incrementOwnerVersion() // B observed: B:1 incoming.incrementOwnerVersion() // B observed: B:2 incoming.seen.incrementVersion(owner: self.nodeB, at: self.nodeA) // B observed: A:1 B:2 @@ -142,12 +142,12 @@ final class GossipSeenTableTests: XCTestCase { func test_seenTable_merge_behindInformation() { // the incoming gossip is "behind" - var table = Cluster.Gossip.SeenTable(myselfNode: self.nodeA, version: .init()) + var table = Cluster.MembershipGossip.SeenTable(myselfNode: self.nodeA, version: .init()) table.incrementVersion(owner: self.nodeA, at: self.nodeA) // A observed: A:1 table.incrementVersion(owner: self.nodeA, at: self.nodeB) // A observed: A:1 B:1 table.incrementVersion(owner: self.nodeA, at: self.nodeB) // A observed: A:1 B:2 - var incoming = Cluster.Gossip(ownerNode: self.nodeB) // B observed: + var incoming = Cluster.MembershipGossip(ownerNode: self.nodeB) // B observed: incoming.incrementOwnerVersion() // B observed: B:1 incoming.incrementOwnerVersion() // B observed: B:2 @@ -160,7 +160,7 @@ final class GossipSeenTableTests: XCTestCase { func test_seenTable_merge_concurrentInformation() { // the incoming gossip is "concurrent" - var table = Cluster.Gossip.SeenTable(myselfNode: self.nodeA, version: .init()) + var table = Cluster.MembershipGossip.SeenTable(myselfNode: self.nodeA, version: .init()) table.incrementVersion(owner: self.nodeA, at: self.nodeA) // A observed: A:1 table.incrementVersion(owner: self.nodeA, at: self.nodeB) // A observed: A:1 B:1 table.incrementVersion(owner: self.nodeA, at: self.nodeB) // A observed: A:1 B:2 @@ -170,7 +170,7 @@ final class GossipSeenTableTests: XCTestCase { table.incrementVersion(owner: self.nodeB, at: self.nodeB) // B observed: B:3 // in reality S is quite more far ahead, already at t=4 - var incoming = Cluster.Gossip(ownerNode: self.nodeB) // B observed + var incoming = Cluster.MembershipGossip(ownerNode: self.nodeB) // B observed incoming.incrementOwnerVersion() // B observed: B:1 incoming.incrementOwnerVersion() // B observed: B:2 incoming.incrementOwnerVersion() // B observed: B:3 @@ -185,13 +185,13 @@ final class GossipSeenTableTests: XCTestCase { func test_seenTable_merge_concurrentInformation_unknownMember() { // the incoming gossip is "concurrent", and has a table entry for a node we don't know - var table = Cluster.Gossip.SeenTable.parse( + var table = Cluster.MembershipGossip.SeenTable.parse( """ A: A:4 """, nodes: self.allNodes ) - let incoming = Cluster.Gossip.SeenTable.parse( + let incoming = Cluster.MembershipGossip.SeenTable.parse( """ A: A:1 B: B:2 C:1 @@ -210,7 +210,7 @@ final class GossipSeenTableTests: XCTestCase { // MARK: Prune func test_prune_removeNodeFromSeenTable() { - var table = Cluster.Gossip.SeenTable(myselfNode: self.nodeA, version: .init()) + var table = Cluster.MembershipGossip.SeenTable(myselfNode: self.nodeA, version: .init()) table.incrementVersion(owner: self.nodeA, at: self.nodeA) table.incrementVersion(owner: self.nodeA, at: self.nodeC) diff --git a/Tests/DistributedActorsTests/Cluster/MembershipGossipLogicSimulationTests.swift b/Tests/DistributedActorsTests/Cluster/MembershipGossipLogicSimulationTests.swift new file mode 100644 index 000000000..6010bd89d --- /dev/null +++ b/Tests/DistributedActorsTests/Cluster/MembershipGossipLogicSimulationTests.swift @@ -0,0 +1,447 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import DistributedActors +import DistributedActorsTestKit +import Logging +import NIO +import XCTest + +final class MembershipGossipLogicSimulationTests: ClusteredActorSystemsXCTestCase { + override func configureActorSystem(settings: inout ActorSystemSettings) { + settings.cluster.enabled = false // not actually clustering, just need a few nodes + } + + override func configureLogCapture(settings: inout LogCapture.Settings) { + settings.filterActorPaths = [ + "/user/peer", + ] + } + + var systems: [ActorSystem] { + self._nodes + } + + func system(_ id: String) -> ActorSystem { + self.systems.first(where: { $0.name == id })! + } + + var nodes: [UniqueNode] { + self._nodes.map { $0.cluster.node } + } + + var mockPeers: [AddressableActorRef] = [] + + var testKit: ActorTestKit { + self.testKit(self.systems.first!) + } + + var logics: [MembershipGossipLogic] = [] + + func logic(_ id: String) -> MembershipGossipLogic { + guard let logic = (self.logics.first { $0.localNode.node.systemName == id }) else { + fatalError("No such logic for id: \(id)") + } + + return logic + } + + var gossips: [Cluster.MembershipGossip] { + self.logics.map { $0.latestGossip } + } + + private func makeLogic(_ system: ActorSystem, _ probe: ActorTestProbe) -> MembershipGossipLogic { + MembershipGossipLogic( + GossipLogicContext( + ownerContext: self.testKit(system).makeFakeContext(), + gossipIdentifier: StringGossipIdentifier("membership") + ), + notifyOnGossipRef: probe.ref + ) + } + + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Simulation Tests + + func test_avgRounds_untilConvergence() throws { + let systemA = self.setUpNode("A") { settings in + settings.cluster.enabled = true + } + let systemB = self.setUpNode("B") + let systemC = self.setUpNode("C") + + let initialGossipState = + """ + A.up B.joining C.joining + A: A@3 B@3 C@3 + B: A@3 B@3 C@3 + C: A@3 B@3 C@3 + """ + + try self.gossipSimulationTest( + runs: 10, + setUpPeers: { () in + [ + Cluster.MembershipGossip.parse(initialGossipState, owner: systemA.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialGossipState, owner: systemB.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialGossipState, owner: systemC.cluster.node, nodes: self.nodes), + ] + }, + updateLogic: { _ in + let logicA: MembershipGossipLogic = self.logic("A") + + // We simulate that `A` noticed it's the leader and moved `B` and `C` .up + logicA.receiveLocalGossipUpdate(Cluster.MembershipGossip.parse( + """ + A.up B.up C.up + A: A@5 B@3 C@3 + B: A@3 B@3 C@3 + C: A@3 B@3 C@3 + """, + owner: systemA.cluster.node, nodes: nodes + )) + }, + stopRunWhen: { (logics, _) in + logics.allSatisfy { $0.latestGossip.converged() } + }, + assert: { results in + results.roundCounts.max()!.shouldBeLessThanOrEqual(3) // usually 2 but 3 is tolerable; may be 1 if we're very lucky with ordering + results.messageCounts.max()!.shouldBeLessThanOrEqual(9) // usually 6, but 9 is tolerable + } + ) + } + + func test_avgRounds_manyNodes() throws { + let systemA = self.setUpNode("A") { settings in + settings.cluster.enabled = true + } + let systemB = self.setUpNode("B") + let systemC = self.setUpNode("C") + let systemD = self.setUpNode("D") + let systemE = self.setUpNode("E") + let systemF = self.setUpNode("F") + let systemG = self.setUpNode("G") + let systemH = self.setUpNode("H") + let systemI = self.setUpNode("I") + let systemJ = self.setUpNode("J") + + let allSystems = [ + systemA, systemB, systemC, systemD, systemE, + systemF, systemG, systemH, systemI, systemJ, + ] + + let initialFewGossip = + """ + A.up B.joining C.joining D.joining E.joining F.joining G.joining H.joining I.joining J.joining + A: A@9 B@3 C@3 D@5 E@5 F@5 G@5 H@5 I@5 J@5 + B: A@5 B@3 C@3 D@5 E@5 F@5 G@5 H@5 I@5 J@5 + C: A@3 B@3 C@3 D@5 E@5 F@5 G@5 H@5 I@5 J@5 + D: A@2 B@3 C@3 D@5 E@5 F@5 G@5 H@5 I@5 J@1 + E: A@2 B@3 C@3 D@5 E@5 F@5 G@5 H@5 I@5 J@1 + F: A@2 B@3 C@3 D@5 E@5 F@5 G@5 H@5 I@5 J@1 + G: A@2 B@3 C@3 D@5 E@5 F@5 G@5 H@5 I@5 J@1 + H: A@2 B@3 C@3 D@5 E@5 F@5 G@5 H@5 I@5 J@1 + I: A@2 B@3 C@3 D@5 E@5 F@5 G@5 H@5 I@5 J@1 + J: A@2 B@3 C@3 D@5 E@5 F@5 G@5 H@5 I@5 J@1 + """ + let initialNewGossip = + """ + D.joining E.joining F.joining G.joining H.joining I.joining J.joining + D: D@5 E@5 F@5 G@5 H@5 I@5 J@5 + E: D@5 E@5 F@5 G@5 H@5 I@5 J@5 + F: D@5 E@5 F@5 G@5 H@5 I@5 J@5 + G: D@5 E@5 F@5 G@5 H@5 I@5 J@5 + H: D@5 E@5 F@5 G@5 H@5 I@5 J@5 + I: D@5 E@5 F@5 G@5 H@5 I@5 J@5 + J: D@5 E@5 F@5 G@5 H@5 I@5 J@5 + """ + + try self.gossipSimulationTest( + runs: 1, + setUpPeers: { () in + [ + Cluster.MembershipGossip.parse(initialFewGossip, owner: systemA.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialFewGossip, owner: systemB.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialFewGossip, owner: systemC.cluster.node, nodes: self.nodes), + + Cluster.MembershipGossip.parse(initialNewGossip, owner: systemD.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialNewGossip, owner: systemE.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialNewGossip, owner: systemF.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialNewGossip, owner: systemG.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialNewGossip, owner: systemH.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialNewGossip, owner: systemI.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialNewGossip, owner: systemJ.cluster.node, nodes: self.nodes), + ] + }, + updateLogic: { _ in + let logicA: MembershipGossipLogic = self.logic("A") + let logicD: MembershipGossipLogic = self.logic("D") + + logicA.receiveLocalGossipUpdate(Cluster.MembershipGossip.parse( + """ + A.up B.up C.up D.up E.up F.up G.up H.up I.up J.up + A: A@20 B@16 C@16 D@16 E@16 F@16 G@16 H@16 I@16 J@16 + B: A@20 B@16 C@16 D@16 E@16 F@16 G@16 H@16 I@16 J@16 + C: A@20 B@16 C@16 D@16 E@16 F@16 G@16 H@16 I@16 J@16 + D: A@20 B@16 C@16 D@16 E@16 F@16 G@16 H@16 I@16 J@16 + E: A@20 B@16 C@16 D@16 E@16 F@16 G@16 H@16 I@16 J@16 + F: A@20 B@16 C@16 D@16 E@16 F@16 G@16 H@16 I@16 J@16 + G: A@20 B@16 C@16 D@16 E@16 F@16 G@16 H@16 I@16 J@16 + H: A@20 B@16 C@16 D@16 E@16 F@16 G@16 H@16 I@16 J@16 + I: A@20 B@16 C@16 D@16 E@16 F@16 G@16 H@16 I@16 J@16 + J: A@20 B@16 C@16 D@16 E@16 F@16 G@16 H@16 I@16 J@16 + """, + owner: systemA.cluster.node, nodes: nodes + )) + + // they're trying to join + logicD.receiveLocalGossipUpdate(Cluster.MembershipGossip.parse( + """ + A.up B.up C.up D.joining E.joining F.joining G.joining H.joining I.joining J.joining + A: A@11 B@16 C@16 D@9 E@13 F@13 G@13 H@13 I@13 J@13 + B: A@12 B@11 C@11 D@9 E@13 F@13 G@13 H@13 I@13 J@13 + C: A@12 B@11 C@11 D@9 E@13 F@13 G@13 H@13 I@13 J@13 + D: A@12 B@11 C@11 D@9 E@13 F@13 G@13 H@13 I@13 J@13 + E: A@12 B@11 C@11 D@9 E@13 F@13 G@13 H@13 I@13 J@13 + F: A@12 B@11 C@11 D@9 E@13 F@13 G@13 H@13 I@13 J@13 + G: A@12 B@11 C@11 D@9 E@13 F@13 G@13 H@13 I@13 J@13 + H: A@12 B@11 C@11 D@9 E@13 F@13 G@13 H@13 I@13 J@13 + I: A@12 B@11 C@11 D@9 E@13 F@13 G@13 H@13 I@13 J@13 + J: A@12 B@11 C@11 D@9 E@13 F@13 G@13 H@13 I@13 J@13 + """, + owner: systemD.cluster.node, nodes: nodes + )) + }, + stopRunWhen: { (logics, _) in + // keep gossiping until all members become .up and converged + logics.allSatisfy { $0.latestGossip.converged() } && + logics.allSatisfy { $0.latestGossip.membership.count(withStatus: .up) == allSystems.count } + }, + assert: { results in + results.roundCounts.max()?.shouldBeLessThanOrEqual(3) + results.messageCounts.max()?.shouldBeLessThanOrEqual(10) + } + ) + } + + func test_shouldEventuallySuspendGossiping() throws { + let systemA = self.setUpNode("A") { settings in + settings.cluster.enabled = true + } + let systemB = self.setUpNode("B") + let systemC = self.setUpNode("C") + + let initialGossipState = + """ + A.up B.joining C.joining + A: A@3 B@3 C@3 + B: A@3 B@3 C@3 + C: A@3 B@3 C@3 + """ + + try self.gossipSimulationTest( + runs: 10, + setUpPeers: { () in + [ + Cluster.MembershipGossip.parse(initialGossipState, owner: systemA.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialGossipState, owner: systemB.cluster.node, nodes: self.nodes), + Cluster.MembershipGossip.parse(initialGossipState, owner: systemC.cluster.node, nodes: self.nodes), + ] + }, + updateLogic: { _ in + let logicA: MembershipGossipLogic = self.logic("A") + + // We simulate that `A` noticed it's the leader and moved `B` and `C` .up + logicA.receiveLocalGossipUpdate(Cluster.MembershipGossip.parse( + """ + A.up B.up C.up + A: A@5 B@3 C@3 + B: A@3 B@3 C@3 + C: A@3 B@3 C@3 + """, + owner: systemA.cluster.node, nodes: nodes + )) + }, + stopRunWhen: { logics, _ in + logics.allSatisfy { logic in + logic.selectPeers(self.peers(of: logic)) == [] // no more peers to talk to + } + }, + assert: { results in + results.roundCounts.max()!.shouldBeLessThanOrEqual(4) + results.messageCounts.max()!.shouldBeLessThanOrEqual(12) + } + ) + } + + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Simulation test infra + + func gossipSimulationTest( + runs: Int, + setUpPeers: () -> [Cluster.MembershipGossip], + updateLogic: ([MembershipGossipLogic]) -> Void, + stopRunWhen: ([MembershipGossipLogic], GossipSimulationResults) -> Bool, + assert: (GossipSimulationResults) -> Void + ) throws { + var roundCounts: [Int] = [] + var messageCounts: [Int] = [] + + var results = GossipSimulationResults( + runs: 0, + roundCounts: roundCounts, + messageCounts: messageCounts + ) + + let initialGossips = setUpPeers() + self.mockPeers = try! self.systems.map { system -> ActorRef.Message> in + let ref: ActorRef.Message> = + try system.spawn("peer", .receiveMessage { _ in .same }) + return self.systems.first!._resolveKnownRemote(ref, onRemoteSystem: system) + }.map { $0.asAddressable() } + + var log = self.systems.first!.log + log[metadataKey: "actor/path"] = "/user/peer" // mock actor path for log capture + + for _ in 1 ... runs { + // initialize with user provided gossips + self.logics = initialGossips.map { initialGossip in + let system = self.system(initialGossip.owner.node.systemName) + let probe = self.testKit(system).spawnTestProbe(expecting: Cluster.MembershipGossip.self) + let logic = self.makeLogic(system, probe) + logic.receiveLocalGossipUpdate(initialGossip) + return logic + } + + func allConverged(gossips: [Cluster.MembershipGossip]) -> Bool { + var allSatisfied = true // on purpose not via .allSatisfy() since we want to print status of each logic + for g in gossips.sorted(by: { $0.owner.node.systemName < $1.owner.node.systemName }) { + let converged = g.converged() + let convergenceStatus = converged ? "(locally assumed) converged" : "not converged" + + log.notice("\(g.owner.node.systemName): \(convergenceStatus)", metadata: [ + "gossip": Logger.MetadataValue.pretty(g), + ]) + + allSatisfied = allSatisfied && converged + } + return allSatisfied + } + + func simulateGossipRound() { + messageCounts.append(0) // make a counter for this run + + // we shuffle the gossips to simulate the slight timing differences -- not always will the "first" node be the first where the timers trigger + // and definitely not always will it always _remain_ the first to be gossiping; there may be others still gossiping around spreading their "not super complete" + // information. + let participatingGossips = self.logics.shuffled() + for logic in participatingGossips { + let selectedPeers: [AddressableActorRef] = logic.selectPeers(self.peers(of: logic)) + log.notice("[\(logic.nodeName)] selected peers: \(selectedPeers.map { $0.address.node!.node.systemName })") + + for targetPeer in selectedPeers { + messageCounts[messageCounts.endIndex - 1] += 1 + + let targetGossip = logic.makePayload(target: targetPeer) + if let gossip = targetGossip { + log.notice(" \(logic.nodeName) -> \(targetPeer.address.node!.node.systemName)", metadata: [ + "gossip": Logger.MetadataValue.pretty(gossip), + ]) + + let targetLogic = self.selectLogic(targetPeer) + let maybeAck = targetLogic.receiveGossip(gossip, from: self.peer(logic)) + log.notice("updated [\(targetPeer.address.node!.node.systemName)]", metadata: [ + "gossip": Logger.MetadataValue.pretty(targetLogic.latestGossip), + ]) + + if let ack = maybeAck { + log.notice(" \(logic.nodeName) <- \(targetPeer.address.node!.node.systemName) (ack)", metadata: [ + "ack": Logger.MetadataValue.pretty(ack), + ]) + logic.receiveAcknowledgement(ack, from: self.peer(targetLogic), confirming: gossip) + } + + } else { + () // skipping target... + } + } + } + } + + updateLogic(logics) + + var rounds = 0 + log.notice("~~~~~~~~~~~~ new gossip instance ~~~~~~~~~~~~") + while !stopRunWhen(self.logics, results) { + rounds += 1 + log.notice("Next gossip round (\(rounds))...") + simulateGossipRound() + + if rounds > 20 { + fatalError("Too many gossip rounds detected! This is definitely wrong.") + } + + results = .init( + runs: runs, + roundCounts: roundCounts, + messageCounts: messageCounts + ) + } + + roundCounts += [rounds] + } + + pinfo("Finished [\(runs)] simulation runs") + pinfo(" Rounds: \(roundCounts) (\(Double(roundCounts.reduce(0, +)) / Double(runs)) avg)") + pinfo(" Messages: \(messageCounts) (\(Double(messageCounts.reduce(0, +)) / Double(runs)) avg)") + + assert(results) + } + + struct GossipSimulationResults { + let runs: Int + var roundCounts: [Int] + var messageCounts: [Int] + } + + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Support functions + + func peers(of logic: MembershipGossipLogic) -> [AddressableActorRef] { + Array(self.mockPeers.filter { $0.address.node! != logic.localNode }) + } + + func selectLogic(_ peer: AddressableActorRef) -> MembershipGossipLogic { + guard let uniqueNode = peer.address.node else { + fatalError("MUST have node, was: \(peer.address)") + } + + return (self.logics.first { $0.localNode == uniqueNode })! + } + + func peer(_ logic: MembershipGossipLogic) -> AddressableActorRef { + let nodeName = logic.localNode.node.systemName + if let peer = (self.mockPeers.first { $0.address.node?.node.systemName == nodeName }) { + return peer + } else { + fatalError("No addressable peer for logic: \(logic), peers: \(self.mockPeers)") + } + } +} + +private extension MembershipGossipLogic { + var nodeName: String { + self.localNode.node.systemName + } +} diff --git a/Tests/DistributedActorsTests/Cluster/MembershipGossipTests.swift b/Tests/DistributedActorsTests/Cluster/MembershipGossipTests.swift index 0e06cfb3b..48f41f061 100644 --- a/Tests/DistributedActorsTests/Cluster/MembershipGossipTests.swift +++ b/Tests/DistributedActorsTests/Cluster/MembershipGossipTests.swift @@ -39,14 +39,14 @@ final class MembershipGossipTests: XCTestCase { // MARK: Merging gossips func test_mergeForward_incomingGossip_firstGossipFromOtherNode() { - var gossip = Cluster.Gossip.parse( + var gossip = Cluster.MembershipGossip.parse( """ A.joining A: A:1 """, owner: self.nodeA, nodes: self.allNodes ) - let incoming = Cluster.Gossip.parse( + let incoming = Cluster.MembershipGossip.parse( """ B.joining B: B:1 @@ -60,7 +60,7 @@ final class MembershipGossipTests: XCTestCase { ) gossip.shouldEqual( - Cluster.Gossip.parse( + Cluster.MembershipGossip.parse( """ A.joining B.joining A: A:1 B:1 @@ -71,14 +71,14 @@ final class MembershipGossipTests: XCTestCase { } func test_mergeForward_incomingGossip_firstGossipFromOtherNodes() { - var gossip = Cluster.Gossip.parse( + var gossip = Cluster.MembershipGossip.parse( """ A.joining A: A:1 """, owner: self.nodeA, nodes: self.allNodes ) - let incoming = Cluster.Gossip.parse( + let incoming = Cluster.MembershipGossip.parse( """ B.joining C.joining B: B:1 C:1 @@ -97,7 +97,7 @@ final class MembershipGossipTests: XCTestCase { ) ) - let expected = Cluster.Gossip.parse( + let expected = Cluster.MembershipGossip.parse( """ A.joining B.joining C.joining A: A:1 B:1 C:1 @@ -115,19 +115,19 @@ final class MembershipGossipTests: XCTestCase { } func test_mergeForward_incomingGossip_sameVersions() { - var gossip = Cluster.Gossip(ownerNode: self.nodeA) + var gossip = Cluster.MembershipGossip(ownerNode: self.nodeA) _ = gossip.membership.join(self.nodeA) gossip.seen.incrementVersion(owner: self.nodeB, at: self.nodeA) // v: myself:1, second:1 _ = gossip.membership.join(self.nodeB) // myself:joining, second:joining - let gossipFromSecond = Cluster.Gossip(ownerNode: self.nodeB) + let gossipFromSecond = Cluster.MembershipGossip(ownerNode: self.nodeB) let directive = gossip.mergeForward(incoming: gossipFromSecond) directive.effectiveChanges.shouldEqual([]) } func test_mergeForward_incomingGossip_fromFourth_onlyKnowsAboutItself() { - var gossip = Cluster.Gossip.parse( + var gossip = Cluster.MembershipGossip.parse( """ A.joining B.joining B.joining A: A@1 B@1 C@1 @@ -135,7 +135,7 @@ final class MembershipGossipTests: XCTestCase { ) // only knows about fourth, while myGossip has first, second and third - let incomingGossip = Cluster.Gossip.parse( + let incomingGossip = Cluster.MembershipGossip.parse( """ D.joining D: D@1 @@ -150,7 +150,7 @@ final class MembershipGossipTests: XCTestCase { [Cluster.MembershipChange(node: self.fourthNode, fromStatus: nil, toStatus: .joining)] ) gossip.shouldEqual( - Cluster.Gossip.parse( + Cluster.MembershipGossip.parse( """ A.joining B.joining C.joining A: A@1 B@1 C@1 D@1 @@ -161,7 +161,7 @@ final class MembershipGossipTests: XCTestCase { } func test_mergeForward_incomingGossip_localHasRemoved_incomingHasOldViewWithDownNode() { - var gossip = Cluster.Gossip.parse( + var gossip = Cluster.MembershipGossip.parse( """ A.up B.down C.up A: A@5 B@5 C@6 @@ -177,7 +177,7 @@ final class MembershipGossipTests: XCTestCase { _ = gossip.pruneMember(removedMember) gossip.incrementOwnerVersion() - let incomingOldGossip = Cluster.Gossip.parse( + let incomingOldGossip = Cluster.MembershipGossip.parse( """ A.up B.down C.up A: A@5 B@5 C@6 @@ -203,7 +203,7 @@ final class MembershipGossipTests: XCTestCase { } func test_mergeForward_incomingGossip_concurrent_leaderDisagreement() { - var gossip = Cluster.Gossip.parse( + var gossip = Cluster.MembershipGossip.parse( """ A.up B.joining [leader:A] A: A@5 B@5 @@ -216,7 +216,7 @@ final class MembershipGossipTests: XCTestCase { // once the nodes talk to each other again, they will run leader election and resolve the double leader situation // until that happens, the two leaders indeed remain as-is -- as the membership itself is not the right place to resolve // who shall be leader - let incomingGossip = Cluster.Gossip.parse( + let incomingGossip = Cluster.MembershipGossip.parse( """ A.up B.joining C.up [leader:B] B: B@2 C@1 @@ -242,7 +242,7 @@ final class MembershipGossipTests: XCTestCase { ] ) - let expected = Cluster.Gossip.parse( + let expected = Cluster.MembershipGossip.parse( """ A.up B.joining C.up [leader:A] A: A:5 B:5 C:9 @@ -257,14 +257,14 @@ final class MembershipGossipTests: XCTestCase { } func test_mergeForward_incomingGossip_concurrent_simple() { - var gossip = Cluster.Gossip.parse( + var gossip = Cluster.MembershipGossip.parse( """ A.up B.joining A: A@4 """, owner: self.nodeA, nodes: self.allNodes ) - let concurrent = Cluster.Gossip.parse( + let concurrent = Cluster.MembershipGossip.parse( """ A.joining B.joining B: B@2 @@ -276,7 +276,7 @@ final class MembershipGossipTests: XCTestCase { gossip.owner.shouldEqual(self.nodeA) directive.effectiveChanges.count.shouldEqual(0) gossip.shouldEqual( - Cluster.Gossip.parse( + Cluster.MembershipGossip.parse( """ A.up B.joining A: A@4 B@2 @@ -287,7 +287,7 @@ final class MembershipGossipTests: XCTestCase { } func test_mergeForward_incomingGossip_hasNewNode() { - var gossip = Cluster.Gossip.parse( + var gossip = Cluster.MembershipGossip.parse( """ A.up A: A@5 @@ -310,7 +310,7 @@ final class MembershipGossipTests: XCTestCase { } func test_mergeForward_removal_incomingGossip_isAhead_hasRemovedNodeKnownToBeDown() { - var gossip = Cluster.Gossip.parse( + var gossip = Cluster.MembershipGossip.parse( """ A.up B.down C.up [leader:A] A: A@5 B@5 C@6 @@ -320,7 +320,7 @@ final class MembershipGossipTests: XCTestCase { owner: self.nodeA, nodes: self.allNodes ) - let incomingGossip = Cluster.Gossip.parse( + let incomingGossip = Cluster.MembershipGossip.parse( """ A.up C.up A: A@5 C@6 @@ -340,7 +340,7 @@ final class MembershipGossipTests: XCTestCase { ) gossip.shouldEqual( - Cluster.Gossip.parse( + Cluster.MembershipGossip.parse( """ A.up C.up [leader:A] A: A@5 C@7 @@ -351,7 +351,7 @@ final class MembershipGossipTests: XCTestCase { } func test_mergeForward_incomingGossip_removal_isAhead_hasMyNodeRemoved_thusWeKeepItAsRemoved() { - var gossip = Cluster.Gossip.parse( + var gossip = Cluster.MembershipGossip.parse( """ A.up B.down C.up A: A@5 B@5 C@6 @@ -361,7 +361,7 @@ final class MembershipGossipTests: XCTestCase { owner: self.nodeB, nodes: self.allNodes ) - let incomingGossip = Cluster.Gossip.parse( + let incomingGossip = Cluster.MembershipGossip.parse( """ A.up C.up A: A@5 C@6 @@ -380,7 +380,7 @@ final class MembershipGossipTests: XCTestCase { // we MIGHT receive a removal of "our node" however we must never apply such change! // we know we are `.down` and that's the most "we" will ever perceive ourselves as -- i.e. removed is only for "others". - let expected = Cluster.Gossip.parse( + let expected = Cluster.MembershipGossip.parse( """ A.up B.removed C.up A: A@5 B@5 C@6 @@ -398,7 +398,7 @@ final class MembershipGossipTests: XCTestCase { // MARK: Convergence func test_converged_shouldBeTrue_forNoMembers() { - var gossip = Cluster.Gossip(ownerNode: self.nodeA) + var gossip = Cluster.MembershipGossip(ownerNode: self.nodeA) _ = gossip.membership.join(self.nodeA) gossip.converged().shouldBeTrue() @@ -407,7 +407,7 @@ final class MembershipGossipTests: XCTestCase { } func test_converged_amongUpMembers() { - var gossip = Cluster.Gossip(ownerNode: self.nodeA) + var gossip = Cluster.MembershipGossip(ownerNode: self.nodeA) _ = gossip.membership.join(self.nodeA) _ = gossip.membership.mark(self.nodeA, as: .up) @@ -451,7 +451,7 @@ final class MembershipGossipTests: XCTestCase { } func test_converged_othersAreOnlyDown() { - let gossip = Cluster.Gossip.parse( + let gossip = Cluster.MembershipGossip.parse( """ A.up B.down A: A@8 B@5 @@ -466,7 +466,7 @@ final class MembershipGossipTests: XCTestCase { // FIXME: we should not need .joining nodes to participate on convergence() func fixme_converged_joiningOrDownMembersDoNotCount() { - var gossip = Cluster.Gossip(ownerNode: self.nodeA) + var gossip = Cluster.MembershipGossip(ownerNode: self.nodeA) _ = gossip.membership.join(self.nodeA) _ = gossip.membership.join(self.nodeB) @@ -518,8 +518,8 @@ final class MembershipGossipTests: XCTestCase { } func test_gossip_eventuallyConverges() { - func makeRandomGossip(owner node: UniqueNode) -> Cluster.Gossip { - var gossip = Cluster.Gossip(ownerNode: node) + func makeRandomGossip(owner node: UniqueNode) -> Cluster.MembershipGossip { + var gossip = Cluster.MembershipGossip(ownerNode: node) _ = gossip.membership.join(node) _ = gossip.membership.mark(node, as: .joining) var vv = VersionVector() diff --git a/Tests/DistributedActorsTests/Cluster/Protobuf/Membership+SerializationTests.swift b/Tests/DistributedActorsTests/Cluster/Protobuf/Membership+SerializationTests.swift index 696a8c5fa..7de484e76 100644 --- a/Tests/DistributedActorsTests/Cluster/Protobuf/Membership+SerializationTests.swift +++ b/Tests/DistributedActorsTests/Cluster/Protobuf/Membership+SerializationTests.swift @@ -58,7 +58,7 @@ final class MembershipSerializationTests: ActorSystemXCTestCase { } let nodes = members.map { $0.node } - let gossip = Cluster.Gossip.parse( + let gossip = Cluster.MembershipGossip.parse( """ 1.joining 2.joining 3.joining 4.up 5.up 6.up 7.up 8.up 9.down 10.down 11.up 12.up 13.up 14.up 15.up 1: 1:4 2:4 3:4 4:6 5:7 6:7 7:8 8:8 9:12 10:12 11:8 12:8 13:8 14:9 15:6 @@ -84,7 +84,7 @@ final class MembershipSerializationTests: ActorSystemXCTestCase { serialized.manifest.serializerID.shouldEqual(Serialization.SerializerID.protobufRepresentable) serialized.buffer.count.shouldEqual(2105) - let back = try system.serialization.deserialize(as: Cluster.Gossip.self, from: serialized) + let back = try system.serialization.deserialize(as: Cluster.MembershipGossip.self, from: serialized) "\(pretty: back)".shouldStartWith(prefix: "\(pretty: gossip)") // nicer human readable error back.shouldEqual(gossip) // the actual sanity check } diff --git a/Tests/DistributedActorsTests/Cluster/TestExtensions+MembershipDSL.swift b/Tests/DistributedActorsTests/Cluster/TestExtensions+MembershipDSL.swift index 1f75126f2..90f9410b5 100644 --- a/Tests/DistributedActorsTests/Cluster/TestExtensions+MembershipDSL.swift +++ b/Tests/DistributedActorsTests/Cluster/TestExtensions+MembershipDSL.swift @@ -19,21 +19,21 @@ import NIO // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Membership Testing DSL -extension Cluster.Gossip { +extension Cluster.MembershipGossip { /// First line is Membership DSL, followed by lines of the SeenTable DSL - internal static func parse(_ dsl: String, owner: UniqueNode, nodes: [UniqueNode]) -> Cluster.Gossip { + internal static func parse(_ dsl: String, owner: UniqueNode, nodes: [UniqueNode]) -> Cluster.MembershipGossip { let dslLines = dsl.split(separator: "\n") - var gossip = Cluster.Gossip(ownerNode: owner) + var gossip = Cluster.MembershipGossip(ownerNode: owner) gossip.membership = Cluster.Membership.parse(String(dslLines.first!), nodes: nodes) - gossip.seen = Cluster.Gossip.SeenTable.parse(dslLines.dropFirst().joined(separator: "\n"), nodes: nodes) + gossip.seen = Cluster.MembershipGossip.SeenTable.parse(dslLines.dropFirst().joined(separator: "\n"), nodes: nodes) return gossip } } -extension Cluster.Gossip.SeenTable { +extension Cluster.MembershipGossip.SeenTable { /// Express seen tables using a DSL /// Syntax: each line: `: @*` - internal static func parse(_ dslString: String, nodes: [UniqueNode], file: StaticString = #file, line: UInt = #line) -> Cluster.Gossip.SeenTable { + internal static func parse(_ dslString: String, nodes: [UniqueNode], file: StaticString = #file, line: UInt = #line) -> Cluster.MembershipGossip.SeenTable { let lines = dslString.split(separator: "\n") func nodeById(id: String.SubSequence) -> UniqueNode { if let found = nodes.first(where: { $0.node.systemName.contains(id) }) { @@ -43,7 +43,7 @@ extension Cluster.Gossip.SeenTable { } } - var table = Cluster.Gossip.SeenTable() + var table = Cluster.MembershipGossip.SeenTable() for line in lines { let elements = line.split(separator: " ") diff --git a/Tests/DistributedActorsTests/Cluster/TestExtensions.swift b/Tests/DistributedActorsTests/Cluster/TestExtensions.swift index 7ddcdd916..3ced7076c 100644 --- a/Tests/DistributedActorsTests/Cluster/TestExtensions.swift +++ b/Tests/DistributedActorsTests/Cluster/TestExtensions.swift @@ -38,7 +38,7 @@ extension ClusterShellState { settings: settings, channel: EmbeddedChannel(), events: EventStream(ref: ActorRef(.deadLetters(.init(log, address: ._deadLetters, system: nil)))), - gossipControl: GossipControl(ActorRef(.deadLetters(.init(log, address: ._deadLetters, system: nil)))), + gossiperControl: GossiperControl(ActorRef(.deadLetters(.init(log, address: ._deadLetters, system: nil)))), log: log ) } diff --git a/Tests/DistributedActorsTests/Gossip/GossiperShellTests.swift b/Tests/DistributedActorsTests/Gossip/GossiperShellTests.swift new file mode 100644 index 000000000..dc30c1477 --- /dev/null +++ b/Tests/DistributedActorsTests/Gossip/GossiperShellTests.swift @@ -0,0 +1,165 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import DistributedActors +import DistributedActorsTestKit +import Foundation +import NIOSSL +import XCTest + +final class GossiperShellTests: ActorSystemXCTestCase { + func peerBehavior() -> Behavior.Message> { + .receiveMessage { msg in + if "\(msg)".contains("stop") { return .stop } else { return .same } + } + } + + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: test_down_beGossipedToOtherNodes + + func test_down_beGossipedToOtherNodes() throws { + let p = self.testKit.spawnTestProbe(expecting: [AddressableActorRef].self) + + let control = try Gossiper.spawn( + self.system, + name: "gossiper", + settings: .init( + interval: .seconds(1), + style: .unidirectional + ) + ) { _ in InspectOfferedPeersTestGossipLogic(offeredPeersProbe: p.ref) } + + let first: ActorRef.Message> = + try self.system.spawn("first", self.peerBehavior()) + let second: ActorRef.Message> = + try self.system.spawn("second", self.peerBehavior()) + + control.introduce(peer: first) + control.introduce(peer: second) + control.update(StringGossipIdentifier("hi"), payload: .init("hello")) + + try Set(p.expectMessage()).shouldEqual(Set([first.asAddressable(), second.asAddressable()])) + + first.tell(.removePayload(identifier: StringGossipIdentifier("stop"))) + try Set(p.expectMessage()).shouldEqual(Set([second.asAddressable()])) + + first.tell(.removePayload(identifier: StringGossipIdentifier("stop"))) + try p.expectNoMessage(for: .milliseconds(300)) + } + + struct InspectOfferedPeersTestGossipLogic: GossipLogic { + struct Gossip: Codable { + let metadata: String + let payload: String + + init(_ info: String) { + self.metadata = info + self.payload = info + } + } + + typealias Acknowledgement = String + + let offeredPeersProbe: ActorRef<[AddressableActorRef]> + init(offeredPeersProbe: ActorRef<[AddressableActorRef]>) { + self.offeredPeersProbe = offeredPeersProbe + } + + func selectPeers(_ peers: [AddressableActorRef]) -> [AddressableActorRef] { + self.offeredPeersProbe.tell(peers) + return [] + } + + func makePayload(target: AddressableActorRef) -> Gossip? { + nil + } + + func receiveAcknowledgement(_ acknowledgement: Acknowledgement, from peer: AddressableActorRef, confirming gossip: Gossip) {} + + func receiveGossip(_ gossip: Gossip, from peer: AddressableActorRef) -> Acknowledgement? { + nil + } + + func receiveLocalGossipUpdate(_ gossip: Gossip) {} + } + + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: test_unidirectional_yetEmitsAck_shouldWarn + + func test_unidirectional_yetReceivesAckRef_shouldWarn() throws { + let p = self.testKit.spawnTestProbe(expecting: String.self) + + let control = try Gossiper.spawn( + self.system, + name: "noAcks", + settings: .init( + interval: .milliseconds(100), + style: .unidirectional + ), + makeLogic: { _ in NoAcksTestGossipLogic(probe: p.ref) } + ) + + let first: ActorRef.Message> = + try self.system.spawn("first", self.peerBehavior()) + + control.introduce(peer: first) + control.update(StringGossipIdentifier("hi"), payload: .init("hello")) + control.ref.tell( + .gossip( + identity: StringGossipIdentifier("example"), + origin: first, .init("unexpected"), + ackRef: system.deadLetters.adapted() // this is wrong on purpose; we're configured as `unidirectional`; this should cause warnings + ) + ) + + try self.logCapture.awaitLogContaining( + self.testKit, + text: " Incoming gossip has acknowledgement actor ref and seems to be expecting an ACK, while this gossiper is configured as .unidirectional!" + ) + } + + struct NoAcksTestGossipLogic: GossipLogic { + struct Gossip: Codable { + let metadata: String + let payload: String + + init(_ info: String) { + self.metadata = info + self.payload = info + } + } + + let probe: ActorRef + + typealias Acknowledgement = String + + func selectPeers(_ peers: [AddressableActorRef]) -> [AddressableActorRef] { + peers + } + + func makePayload(target: AddressableActorRef) -> Gossip? { + .init("Hello") // legal but will produce a warning + } + + func receiveAcknowledgement(_ acknowledgement: Acknowledgement, from peer: AddressableActorRef, confirming gossip: Gossip) { + self.probe.tell("un-expected acknowledgement: \(acknowledgement) from \(peer) confirming \(gossip)") + } + + func receiveGossip(_ gossip: Gossip, from peer: AddressableActorRef) -> Acknowledgement? { + nil + } + + func receiveLocalGossipUpdate(_ gossip: Gossip) {} + } +}