Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Docs/clustering.adoc
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@

[[clustering]]
== Clustering

> One actor is no actor, they come in systems.

By connecting several nodes into a cluster you gain the ability to _transparently_ send and receive messages from actors
located on other nodes. Along with this, advanced failure detection and mitigation mechanisms are also enabled by default.

[[cluster_quickstart]]
=== Cluster QuickStart

==== Step 1: Start nodes and join them
Expand Down
254 changes: 159 additions & 95 deletions Docs/serialization.adoc

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ let system = ActorSystem("it_XPCActorable_echo_service") { settings in

settings.cluster.swim.failureDetector.pingTimeout = .seconds(3)

// settings.serialization.registerCodable(GeneratedActor.Messages.XPCEchoServiceProtocol.self, underId: 10001)
// settings.serialization.registerCodable(XPCEchoService.Message.self, underId: 10002)
// settings.serialization.registerCodable(Result<String, Error>.self, underId: 10003)
// settings.serialization.register(GeneratedActor.Messages.XPCEchoServiceProtocol.self, underId: 10001)
// settings.serialization.register(XPCEchoService.Message.self, underId: 10002)
// settings.serialization.register(Result<String, Error>.self, underId: 10003)
}

try! _file.append("service booted...\n")
Expand Down
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ var dependencies: [Package.Dependency] = [
.package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.1"),

// ~~~ SSWG APIs ~~~

.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
// swift-metrics 1.x and 2.x are almost API compatible, so most clients should use
.package(url: "https://github.com/apple/swift-metrics.git", "1.0.0" ..< "3.0.0"),
Expand Down
6 changes: 3 additions & 3 deletions Samples/Sources/XPCActorCaller/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ let serviceName = "com.apple.actors.xpc.GreetingsService"
let system = ActorSystem("XPCActorCaller") { settings in
settings.transports += .xpc

// settings.serialization.registerCodable(GeneratedActor.Messages.GreetingsService.self)
// settings.serialization.registerCodable(GreetingsServiceStub.Message.self)
// settings.serialization.registerCodable(Result<String, Error>.self)
// settings.serialization.register(GeneratedActor.Messages.GreetingsService.self)
// settings.serialization.register(GreetingsServiceStub.Message.self)
// settings.serialization.register(Result<String, Error>.self)
}

// TODO: we currently need a ref to the real GreetingsService... since we cannot put a Protocol.self in there...
Expand Down
6 changes: 3 additions & 3 deletions Samples/Sources/XPCActorServiceProvider/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ let system = ActorSystem("XPCActorServiceProvider") { settings in
// TODO: make this the source of "truth" what transports are available
settings.transports += .xpcService

// settings.serialization.registerCodable(GeneratedActor.Messages.GreetingsService.self)
// settings.serialization.registerCodable(GreetingsServiceImpl.Message.self)
// settings.serialization.registerCodable(Result<String, Error>.self)
// settings.serialization.register(GeneratedActor.Messages.GreetingsService.self)
// settings.serialization.register(GreetingsServiceImpl.Message.self)
// settings.serialization.register(Result<String, Error>.self)
}

let service = try XPCActorableService(system, GreetingsServiceImpl.init)
Expand Down
15 changes: 3 additions & 12 deletions Sources/DistributedActors/ActorMessages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public struct BestEffortStringError: Error, Codable, Equatable, CustomStringConv
}

/// Useful error wrapper which performs an best effort Error serialization as configured by the actor system.
public struct NotTransportableAnyError: Error, NonTransportableActorMessage {
public struct NonTransportableAnyError: Error, NonTransportableActorMessage {
public let failure: Error

public init<Failure: Error>(_ failure: Failure) {
Expand All @@ -175,23 +175,14 @@ public struct NotTransportableAnyError: Error, NonTransportableActorMessage {
/// No serializer is expected to be registered for such types.
///
/// - Warning: Attempting to send such message over the network will fail at runtime (and log an error or warning).
public protocol NonTransportableActorMessage: ActorMessage {
// Really what this would like to express is:
//
// func deepCopy(): Self
//
// Such that we could guarantee actors do not share state accidentally via references,
// and if we could prove a type is a value type it could safely `return self` here.
// While reference types would always need to perform a deep copy, or rely on copy on write semantics etc.
// OR if a reference type is known to be read-only / immutable, it could get away with sharing self as well perhaps?
}
public protocol NonTransportableActorMessage: ActorMessage {}

extension NonTransportableActorMessage {
public init(from decoder: Swift.Decoder) throws {
fatalError("Attempted to decode NonTransportableActorMessage message: \(Self.self)! This should never happen.")
}

public func encode(to encoder: Encoder) throws {
public func encode(to encoder: Swift.Encoder) throws {
fatalError("Attempted to encode NonTransportableActorMessage message: \(Self.self)! This should never happen.")
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public final class ActorSystem {
self._serialization = Serialization(settings: settings, system: self)
}

// vvv all properties initialized, self can be shared vvv
// vvv~~~~~~~~~~~~~~~~~~~ all properties initialized, self can be shared ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~vvv

// dead letters init
let overrideLogger: Logger? = settings.logging.overrideLoggerFactory.map { f in f("\(ActorPath._deadLetters)") }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ extension OperationLogClusterReceptionist {
}

public required init(from decoder: Decoder) throws {
throw SerializationError.notTransportableMessage(type: "\(Self.self)")
throw SerializationError.nonTransportableMessage(type: "\(Self.self)")
}

var description: String {
Expand All @@ -850,7 +850,7 @@ extension OperationLogClusterReceptionist {
}

public required init(from decoder: Decoder) throws {
throw SerializationError.notTransportableMessage(type: "\(Self.self)")
throw SerializationError.nonTransportableMessage(type: "\(Self.self)")
}

var description: String {
Expand Down
6 changes: 3 additions & 3 deletions Sources/DistributedActors/Mailbox.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ internal final class Mailbox<Message: ActorMessage> {
self.address = shell._address

// TODO: not entirely happy about the added weight, but I suppose avoiding going all the way "into" the settings on each send is even worse?
self.serializeAllMessages = shell.system.settings.serialization.allMessages
self.serializeAllMessages = shell.system.settings.serialization.serializeLocalMessages
}

#if SACT_TESTS_LEAKS
Expand All @@ -103,7 +103,7 @@ internal final class Mailbox<Message: ActorMessage> {
self.address = system.deadLetters.address

// TODO: not entirely happy about the added weight, but I suppose avoiding going all the way "into" the settings on each send is even worse?
self.serializeAllMessages = system.settings.serialization.allMessages
self.serializeAllMessages = system.settings.serialization.serializeLocalMessages
}

@inlinable
Expand All @@ -118,7 +118,7 @@ internal final class Mailbox<Message: ActorMessage> {
} catch {
fatalError("Serialization check failed for message \(messageDescription) sent at \(file):\(line). " +
"Make sure this type has either a serializer registered OR is marked as `NonTransportableActorMessage`. " +
"This check was performed since `settings.serialization.allMessages` was enabled.")
"This check was performed since `settings.serialization.serializeLocalMessages` was enabled.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ extension ConvergentGossip.Message {
try container.encode(DiscriminatorKeys.gossip, forKey: ._case)
try container.encode(envelope, forKey: .gossip_envelope)
default:
throw SerializationError.notTransportableMessage(type: "\(self)")
throw SerializationError.nonTransportableMessage(type: "\(self)")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,19 @@ extension _SystemMessage: ProtobufRepresentable {
proto.payload = .terminated(terminated)

case .carrySignal(let signal):
throw SerializationError.notTransportableMessage(type: "SystemMessage.carrySignal(\(signal))")
throw SerializationError.nonTransportableMessage(type: "SystemMessage.carrySignal(\(signal))")
case .start:
throw SerializationError.notTransportableMessage(type: "SystemMessage.start")
throw SerializationError.nonTransportableMessage(type: "SystemMessage.start")
case .nodeTerminated:
throw SerializationError.notTransportableMessage(type: "SystemMessage.addressTerminated")
throw SerializationError.nonTransportableMessage(type: "SystemMessage.addressTerminated")
case .childTerminated:
throw SerializationError.notTransportableMessage(type: "SystemMessage.childTerminated")
throw SerializationError.nonTransportableMessage(type: "SystemMessage.childTerminated")
case .resume:
throw SerializationError.notTransportableMessage(type: "SystemMessage.resume")
throw SerializationError.nonTransportableMessage(type: "SystemMessage.resume")
case .stop:
throw SerializationError.notTransportableMessage(type: "SystemMessage.stop")
throw SerializationError.nonTransportableMessage(type: "SystemMessage.stop")
case .tombstone:
throw SerializationError.notTransportableMessage(type: "SystemMessage.tombstone")
throw SerializationError.nonTransportableMessage(type: "SystemMessage.tombstone")
}
return proto
}
Expand Down
10 changes: 5 additions & 5 deletions Sources/DistributedActors/Receptionist.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public enum Receptionist {
}

public required init(from decoder: Decoder) throws {
throw SerializationError.notTransportableMessage(type: "")
throw SerializationError.nonTransportableMessage(type: "")
}

internal override var _addressableActorRef: AddressableActorRef {
Expand Down Expand Up @@ -134,7 +134,7 @@ public enum Receptionist {
}

required init(from decoder: Decoder) throws {
throw SerializationError.notTransportableMessage(type: "\(Self.self)")
throw SerializationError.nonTransportableMessage(type: "\(Self.self)")
}

override func replyWith(_ refs: Set<AddressableActorRef>) {
Expand Down Expand Up @@ -162,7 +162,7 @@ public enum Receptionist {
}

public required init(from decoder: Decoder) throws {
throw SerializationError.notTransportableMessage(type: "\(Self.self)")
throw SerializationError.nonTransportableMessage(type: "\(Self.self)")
}

internal override var _key: _RegistrationKey {
Expand Down Expand Up @@ -430,7 +430,7 @@ public class _Lookup: ReceptionistMessage, NonTransportableActorMessage {
}

required init(from decoder: Decoder) throws {
throw SerializationError.notTransportableMessage(type: "\(Self.self)")
throw SerializationError.nonTransportableMessage(type: "\(Self.self)")
}

func replyWith(_ refs: Set<AddressableActorRef>) {
Expand Down Expand Up @@ -542,7 +542,7 @@ public class _Subscribe: ReceptionistMessage, NonTransportableActorMessage {
}

required init(from decoder: Decoder) throws {
throw SerializationError.notTransportableMessage(type: "\(Self.self)")
throw SerializationError.nonTransportableMessage(type: "\(Self.self)")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ extension Decodable {
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Encodable + _encode(bytes:using:SomeDecoder) extensions

// TODO: once we can abstract over Coders all these could go away most likely (and accept a generic TopLevelCoder
// TODO: once we can abstract over Coders all these could go away most likely (and accept a generic TopLevelCoder)

extension Encodable {
func _encode(using encoder: JSONEncoder, allocator: ByteBufferAllocator) throws -> NIO.ByteBuffer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ extension Serialization {
/// bytes from the message envelope size on the wire.
public struct Manifest: Codable, Hashable {
/// Serializer used to serialize accompanied message.
///
/// A serializerID of zero (`0`), implies that this specific message is never intended to be serialized.
public let serializerID: SerializerID

/// A "hint" for the serializer what data type is serialized in the accompanying payload.
Expand Down Expand Up @@ -131,9 +129,10 @@ extension Serialization {
#endif

let manifest: Manifest?
if messageType is Codable.Type {
let defaultCodableSerializerID = self.settings.defaultSerializerID
manifest = Manifest(serializerID: defaultCodableSerializerID, hint: hint)
if messageType is AnyProtobufRepresentable.Type {
manifest = Manifest(serializerID: .protobufRepresentable, hint: hint)
} else if messageType is Codable.Type {
manifest = Manifest(serializerID: self.settings.defaultSerializerID, hint: hint)
} else if messageType is NonTransportableActorMessage.Type {
manifest = Manifest(serializerID: .doNotSerialize, hint: nil)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ extension Serialization {
switch self.value {
case SerializerID.doNotSerialize.value:
return "serializerID:doNotSerialize(\(self.value))"
case SerializerID.specialized.value:
case SerializerID.specializedWithTypeHint.value:
return "serializerID:specialized(\(self.value))"
case SerializerID.foundationJSON.value:
return "serializerID:jsonCodable(\(self.value))"
Expand Down Expand Up @@ -63,7 +63,7 @@ extension Serialization.SerializerID {
// ~~~~~~~~~~~~~~~~ general purpose serializer ids ~~~~~~~~~~~~~~~~
public static let doNotSerialize: SerializerID = 0

public static let specialized: SerializerID = 1
public static let specializedWithTypeHint: SerializerID = 1
public static let foundationJSON: SerializerID = 2
// public static let foundationPropertyList: SerializerID = 3 // TODO: https://github.com/apple/swift-distributed-actors/issues/513
public static let protobufRepresentable: SerializerID = 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ extension Serializer: AnySerializer {
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: NotTransportableSerializer
// MARK: NonTransportableSerializer

/// Nope, as opposed to Noop
internal class NotTransportableSerializer<Message>: Serializer<Message> {
internal class NonTransportableSerializer<Message>: Serializer<Message> {
override func serialize(_ message: Message) throws -> ByteBuffer {
throw SerializationError.unableToSerialize(hint: "\(Self.self): \(Message.self)")
}
Expand Down
Loading