Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 21 additions & 31 deletions Sources/DistributedActors/CRDT/CRDT+Gossip.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ extension CRDT {
/// It collaborates with the Direct Replicator in order to avoid needlessly sending values to nodes which already know
/// about them (e.g. through direct replication).
final class GossipReplicatorLogic: GossipLogic {
typealias Envelope = CRDT.Gossip
typealias Gossip = CRDT.Gossip
typealias Acknowledgement = CRDT.GossipAck

let identity: CRDT.Identity
let context: Context
Expand Down Expand Up @@ -54,7 +55,7 @@ extension CRDT {
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Spreading gossip

func selectPeers(peers: [AddressableActorRef]) -> [AddressableActorRef] {
func selectPeers(_ peers: [AddressableActorRef]) -> [AddressableActorRef] {
// how many peers we select in each gossip round,
// we could for example be dynamic and notice if we have 10+ nodes, we pick 2 members to speed up the dissemination etc.
let n = 1
Expand All @@ -79,32 +80,34 @@ extension CRDT {
self.latest
}

func receivePayloadACK(target: AddressableActorRef, confirmedDeliveryOf envelope: CRDT.Gossip) {
guard (self.latest.map { $0.payload.equalState(to: envelope.payload) } ?? false) else {
func receiveAcknowledgement(_ acknowledgement: Acknowledgement, from peer: AddressableActorRef, confirming gossip: CRDT.Gossip) {
guard (self.latest.map { $0.payload.equalState(to: gossip.payload) } ?? false) else {
// received an ack for something, however it's not the "latest" anymore, so we need to gossip to target anyway
return
}

// TODO: in v3 this would translate to ACKing specific deltas for this target
// good, the latest gossip is still the same as was confirmed here, so we can mark it acked
self.peersAckedOurLatestGossip.insert(target.address)
self.peersAckedOurLatestGossip.insert(peer.address)
}

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Receiving gossip

func receiveGossip(origin: AddressableActorRef, payload: CRDT.Gossip) {
func receiveGossip(_ gossip: Gossip, from peer: AddressableActorRef) -> CRDT.GossipAck? {
// merge the datatype locally, and update our information about the origin's knowledge about this datatype
// (does it already know about our data/all-deltas-we-are-aware-of or not)
self.mergeInbound(from: origin, payload)
self.mergeInbound(from: peer, gossip)

// notify the direct replicator to update all local `actorOwned` CRDTs.
// TODO: the direct replicator may choose to delay flushing this information a bit to avoid much data churn see `settings.crdt.`
self.replicatorControl.tellGossipWrite(id: self.identity, data: payload.payload)
self.replicatorControl.tellGossipWrite(id: self.identity, data: gossip.payload)

return CRDT.GossipAck()
}

func localGossipUpdate(payload: CRDT.Gossip) {
self.mergeInbound(from: nil, payload)
func receiveLocalGossipUpdate(_ gossip: CRDT.Gossip) {
self.mergeInbound(from: nil, gossip)
// during the next gossip round we'll gossip the latest most-up-to date version now;
// no need to schedule that, we'll be called when it's time.
}
Expand All @@ -117,8 +120,8 @@ extension CRDT {
// and gossip replicator, so that's a downside that we'll eventually want to address.

enum SideChannelMessage {
case localUpdate(Envelope)
case ack(origin: AddressableActorRef, payload: Envelope)
case localUpdate(Gossip)
case ack(origin: AddressableActorRef, payload: Gossip)
}

func receiveSideChannelMessage(message: Any) throws {
Expand Down Expand Up @@ -177,15 +180,10 @@ extension CRDT.Identity: GossipIdentifier {

extension CRDT {
/// The gossip to be spread about a specific CRDT (identity).
struct Gossip: GossipEnvelopeProtocol, CustomStringConvertible, CustomPrettyStringConvertible {
struct Metadata: Codable {}
typealias Payload = StateBasedCRDT

var metadata: Metadata
var payload: Payload
struct Gossip: Codable, CustomStringConvertible, CustomPrettyStringConvertible {
var payload: StateBasedCRDT

init(metadata: CRDT.Gossip.Metadata, payload: CRDT.Gossip.Payload) {
self.metadata = metadata
init(payload: StateBasedCRDT) {
self.payload = payload
}

Expand All @@ -198,15 +196,15 @@ extension CRDT {
}

var description: String {
"CRDT.Gossip(metadata: \(metadata), payload: \(payload))"
"CRDT.Gossip(\(payload))"
}
}

struct GossipAck: Codable {}
}

extension CRDT.Gossip {
enum CodingKeys: CodingKey {
case metadata
case metadataManifest
case payload
case payloadManifest
}
Expand All @@ -218,10 +216,6 @@ extension CRDT.Gossip {

let container = try decoder.container(keyedBy: CodingKeys.self)

let manifestData = try container.decode(Data.self, forKey: .metadata)
let manifestManifest = try container.decode(Serialization.Manifest.self, forKey: .metadataManifest)
self.metadata = try context.serialization.deserialize(as: Metadata.self, from: .data(manifestData), using: manifestManifest)

let payloadData = try container.decode(Data.self, forKey: .payload)
let payloadManifest = try container.decode(Serialization.Manifest.self, forKey: .payloadManifest)
self.payload = try context.serialization.deserialize(as: StateBasedCRDT.self, from: .data(payloadData), using: payloadManifest)
Expand All @@ -237,9 +231,5 @@ extension CRDT.Gossip {
let serializedPayload = try context.serialization.serialize(self.payload)
try container.encode(serializedPayload.buffer.readData(), forKey: .payload)
try container.encode(serializedPayload.manifest, forKey: .payloadManifest)

let serializedMetadata = try context.serialization.serialize(self.metadata)
try container.encode(serializedMetadata.buffer.readData(), forKey: .metadata)
try container.encode(serializedMetadata.manifest, forKey: .metadataManifest)
}
}
4 changes: 4 additions & 0 deletions Sources/DistributedActors/CRDT/CRDT+Replication.swift
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ extension CRDT {
/// during this round
public var gossipInterval: TimeAmount = .seconds(2)

/// Timeout used when asking another peer when spreading gossip.
/// Timeouts are logged, but by themselves not "errors", as we still eventually may be able to spread the payload to given peer.
public var gossipAcknowledgementTimeout: TimeAmount = .milliseconds(500)

/// Adds a random factor to the gossip interval, which is useful to avoid an entire cluster ticking "synchronously"
/// at the same time, causing spikes in gossip traffic (as all nodes decide to gossip in the same second).
///
Expand Down
20 changes: 8 additions & 12 deletions Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ extension CRDT.Replicator {
typealias RemoteDeleteResult = CRDT.Replicator.RemoteCommand.DeleteResult

private let directReplicator: CRDT.Replicator.Instance
private var gossipReplication: GossipControl<CRDT.Gossip>!
private var gossipReplication: GossiperControl<CRDT.Gossip, CRDT.GossipAck>!

// TODO: better name; this is the control from Gossip -> Local
struct LocalControl {
Expand Down Expand Up @@ -71,12 +71,13 @@ extension CRDT.Replicator {
}
)

self.gossipReplication = try Gossiper.start(
self.gossipReplication = try Gossiper.spawn(
context,
name: "gossip",
settings: Gossiper.Settings(
gossipInterval: self.settings.gossipInterval,
gossipIntervalRandomFactor: self.settings.gossipIntervalRandomFactor,
interval: self.settings.gossipInterval,
intervalRandomFactor: self.settings.gossipIntervalRandomFactor,
style: .acknowledged(timeout: .seconds(1)),
peerDiscovery: .fromReceptionistListing(id: "crdt-gossip-replicator")
),
makeLogic: { logicContext in
Expand Down Expand Up @@ -267,10 +268,8 @@ extension CRDT.Replicator {
_ id: CRDT.Identity,
_ data: StateBasedCRDT
) {
let gossip = CRDT.Gossip(
metadata: .init(),
payload: data // TODO: v2, allow tracking the deltas here
)
// TODO: v2, allow tracking the deltas here
let gossip = CRDT.Gossip(payload: data)
self.gossipReplication.update(id, payload: gossip)
}

Expand Down Expand Up @@ -407,10 +406,7 @@ extension CRDT.Replicator {
replyTo.tell(.success(updatedData))

// Update the data stored in the replicator (yeah today we store 2 copies in the replicators, we could converge them into one with enough effort)
let gossip = CRDT.Gossip(
metadata: .init(),
payload: updatedData
)
let gossip = CRDT.Gossip(payload: updatedData)
self.gossipReplication.update(id, payload: gossip) // TODO: v2, allow tracking the deltas here

// Followed by notifying all owners since the CRDT might have been updated
Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedActors/Cluster/Cluster+Membership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ extension Cluster.Membership: Hashable {
extension Cluster.Membership: CustomStringConvertible, CustomDebugStringConvertible, CustomPrettyStringConvertible {
/// Pretty multi-line output of a membership, useful for manual inspection
public var prettyDescription: String {
var res = "LEADER: \(self.leader, orElse: ".none")"
var res = "leader: \(self.leader, orElse: ".none")"
for member in self._members.values.sorted(by: { $0.node.node.port < $1.node.node.port }) {
res += "\n \(reflecting: member.node) STATUS: [\(member.status.rawValue, leftPadTo: Cluster.MemberStatus.maxStrLen)]"
res += "\n \(reflecting: member.node) status [\(member.status.rawValue, leftPadTo: Cluster.MemberStatus.maxStrLen)]"
}
return res
}
Expand Down
148 changes: 0 additions & 148 deletions Sources/DistributedActors/Cluster/Cluster+MembershipGossipLogic.swift

This file was deleted.

6 changes: 6 additions & 0 deletions Sources/DistributedActors/Cluster/ClusterSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public struct ClusterSettings {

public var membershipGossipInterval: TimeAmount = .seconds(1)

// since we talk to many peers one by one; even as we proceed to the next round after `membershipGossipInterval`
// it is fine if we get a reply from the previously gossiped to peer after same or similar timeout. No rush about it.
//
// A missing ACK is not terminal, may happen, and we'll then gossip with that peer again (e.g. if it ha had some form of network trouble for a moment).
public var membershipGossipAcknowledgementTimeout: TimeAmount = .seconds(1)

public var membershipGossipIntervalRandomFactor: Double = 0.2

// ==== ------------------------------------------------------------------------------------------------------------
Expand Down
Loading