Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ProtobufRepresentable public #58

Merged
merged 2 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ let dependencies: [Package.Dependency] = [
.package(url: "https://github.com/apple/swift-metrics.git", from: "1.0.0"),

// ~~~ only for samples ~~~
.package(url: "https://github.com/MrLotU/SwiftPrometheus", .branch("master"))
.package(url: "https://github.com/MrLotU/SwiftPrometheus", .branch("master")),
]

let package = Package(
Expand Down
40 changes: 40 additions & 0 deletions Protos/ActorAddress.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

syntax = "proto3";

option optimize_for = SPEED;
option swift_prefix = "Proto";

message ActorAddress {
UniqueNode node = 1; // TODO oneof { senderNode | recipientNode | node }
ActorPath path = 2;
uint32 incarnation = 3;
}

message ActorPath {
repeated string segments = 1;
}

message UniqueNode {
Node node = 1;
uint32 nid = 2;
}

message Node {
string protocol = 1;
string system = 2;
string hostname = 3;
uint32 port = 4;
}
2 changes: 1 addition & 1 deletion Protos/Cluster/SWIM/SWIM.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ syntax = "proto3";
option optimize_for = SPEED;
option swift_prefix = "Proto";

import "WireProtocol.proto";
import "ActorAddress.proto";

message SWIMMessage {
oneof request {
Expand Down
2 changes: 1 addition & 1 deletion Protos/SystemMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ syntax = "proto3";
option optimize_for = SPEED;
option swift_prefix = "Proto";

import "WireProtocol.proto";
import "ActorAddress.proto";

// === System Message --------------------------------------------------------------

Expand Down
24 changes: 2 additions & 22 deletions Protos/WireProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ syntax = "proto3";
option optimize_for = SPEED;
option swift_prefix = "Proto";

import "ActorAddress.proto";

// === Handshake --------------------------------------------------------------

message HandshakeOffer {
Expand Down Expand Up @@ -72,33 +74,11 @@ message SystemEnvelope {
bytes payload = 4;
}

message ActorAddress {
UniqueNode node = 1; // TODO oneof { senderNode | recipientNode | node }
ActorPath path = 2;
uint32 incarnation = 3;
}

message ActorPath {
repeated string segments = 1;
}

message SystemAck {
uint64 sequenceNr = 1;
UniqueNode from = 2;
}

message UniqueNode {
Node node = 1;
uint32 nid = 2;
}

message Node {
string protocol = 1;
string system = 2;
string hostname = 3;
uint32 port = 4;
}

// The version is represented as 4 bytes:
// - reserved: Can be used in the future for additional flags
// - major
Expand Down
8 changes: 4 additions & 4 deletions Samples/SampleMetrics/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import DistributedActors

import Metrics
import Prometheus
//import StatsdClient
// import StatsdClient

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Prometheus backend
Expand All @@ -33,16 +33,16 @@ MetricsSystem.bootstrap(prom)
// let statsdClient = try StatsdClient(host: "localhost", port: 8125)
// MetricsSystem.bootstrap(statsdClient)


// start actor system
let system = ActorSystem("Metrics") { settings in
let system = ActorSystem("Metrics") { settings in
settings.cluster.enabled = true
}

struct Talker {
enum Message {
case hello(Int, replyTo: ActorRef<Talker.Message>?)
}

static func talkTo(another talker: ActorRef<Message>?) -> Behavior<Message> {
return .setup { context in
context.log.info("Started \(context.myself.path)")
Expand Down Expand Up @@ -99,7 +99,7 @@ let t4 = try system.spawn("talker-4", props: props, Talker.talkTo(another: t3))

let m = try system.spawn("metricsPrinter", MetricPrinter.behavior)

for i in 1...10 {
for i in 1 ... 10 {
_ = try system.spawn("life-\(i)", DieAfterSomeTime.behavior)
Thread.sleep(.seconds(1))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import Foundation
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Serialization

extension SWIM.Message: ProtobufRepresentable {
typealias ProtobufType = ProtoSWIMMessage
extension SWIM.Message: InternalProtobufRepresentable {
typealias InternalProtobufRepresentation = ProtoSWIMMessage

func toProto(context: ActorSerializationContext) -> ProtoSWIMMessage {
var proto = ProtoSWIMMessage()
Expand Down Expand Up @@ -66,8 +66,8 @@ extension SWIM.Message: ProtobufRepresentable {
}
}

extension SWIM.Status: ProtobufRepresentable {
typealias ProtobufType = ProtoSWIMStatus
extension SWIM.Status: InternalProtobufRepresentable {
typealias InternalProtobufRepresentation = ProtoSWIMStatus

func toProto(context: ActorSerializationContext) -> ProtoSWIMStatus {
var proto = ProtoSWIMStatus()
Expand Down Expand Up @@ -105,8 +105,8 @@ extension SWIM.Status: ProtobufRepresentable {
}
}

extension SWIM.Payload: ProtobufRepresentable {
typealias ProtobufType = ProtoSWIMPayload
extension SWIM.Payload: InternalProtobufRepresentable {
typealias InternalProtobufRepresentation = ProtoSWIMPayload

func toProto(context: ActorSerializationContext) -> ProtoSWIMPayload {
var payload = ProtoSWIMPayload()
Expand All @@ -127,8 +127,8 @@ extension SWIM.Payload: ProtobufRepresentable {
}
}

extension SWIM.Member: ProtobufRepresentable {
typealias ProtobufType = ProtoSWIMMember
extension SWIM.Member: InternalProtobufRepresentable {
typealias InternalProtobufRepresentation = ProtoSWIMMember

func toProto(context: ActorSerializationContext) -> ProtoSWIMMember {
var proto = ProtoSWIMMember()
Expand All @@ -145,8 +145,8 @@ extension SWIM.Member: ProtobufRepresentable {
}
}

extension SWIM.Ack: ProtobufRepresentable {
typealias ProtobufType = ProtoSWIMAck
extension SWIM.Ack: InternalProtobufRepresentable {
typealias InternalProtobufRepresentation = ProtoSWIMAck

func toProto(context: ActorSerializationContext) -> ProtoSWIMAck {
var proto = ProtoSWIMAck()
Expand Down
151 changes: 151 additions & 0 deletions Sources/DistributedActors/Protobuf/ActorAddress+Serialization.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: ProtoActorAddress

extension ActorAddress {
init(_ proto: ProtoActorAddress) throws {
let path = try ActorPath(proto.path.segments.map { try ActorPathSegment($0) })
let incarnation = ActorIncarnation(Int(proto.incarnation))

// TODO: switch over senderNode | recipientNode | address
if proto.hasNode {
self = try ActorAddress(node: UniqueNode(proto.node), path: path, incarnation: incarnation)
} else {
self = ActorAddress(path: path, incarnation: incarnation)
}
}
}

extension ProtoActorAddress {
init(_ value: ActorAddress) {
if let node = value.node {
self.node = .init(node)
}
self.path = .init(value.path)
self.incarnation = value.incarnation.value
}
}

extension ActorAddress: ProtobufRepresentable {
public typealias ProtobufRepresentation = ProtoActorAddress

public func toProto(context: ActorSerializationContext) -> ProtoActorAddress {
ktoso marked this conversation as resolved.
Show resolved Hide resolved
var address = ProtoActorAddress()
let node = self.node ?? context.localNode
address.node.nid = node.nid.value
address.node.node.protocol = node.node.protocol
address.node.node.system = node.node.systemName
address.node.node.hostname = node.node.host
address.node.node.port = UInt32(node.node.port)

address.path.segments = self.segments.map { $0.value }
address.incarnation = self.incarnation.value

return address
}

public init(fromProto proto: ProtoActorAddress, context: ActorSerializationContext) throws {
let node = Node(
protocol: proto.node.node.protocol,
systemName: proto.node.node.system,
host: proto.node.node.hostname,
port: Int(proto.node.node.port)
)

let uniqueNode = UniqueNode(node: node, nid: NodeID(proto.node.nid))

// TODO: make Error
let path = try ActorPath(proto.path.segments.map { try ActorPathSegment($0) })

self.init(node: uniqueNode, path: path, incarnation: ActorIncarnation(proto.incarnation))
}
}

extension ActorRef: ProtobufRepresentable {
public typealias ProtobufRepresentation = ProtoActorAddress

public func toProto(context: ActorSerializationContext) -> ProtoActorAddress {
return self.address.toProto(context: context)
}

public init(fromProto proto: ProtoActorAddress, context: ActorSerializationContext) throws {
self = context.resolveActorRef(Message.self, identifiedBy: try ActorAddress(fromProto: proto, context: context))
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: ProtoActorPath

extension ActorPath {
init(_ proto: ProtoActorPath) throws {
guard !proto.segments.isEmpty else {
throw SerializationError.emptyRepeatedField("path.segments")
}

self.segments = try proto.segments.map { try ActorPathSegment($0) }
}
}

extension ProtoActorPath {
init(_ value: ActorPath) {
self.segments = value.segments.map { $0.value } // TODO: avoiding the mapping could be nice... store segments as strings?
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: ProtoUniqueNode

extension UniqueNode {
init(_ proto: ProtoUniqueNode) throws {
guard proto.hasNode else {
throw SerializationError.missingField("address", type: String(describing: UniqueNode.self))
}
guard proto.nid != 0 else {
throw SerializationError.missingField("uid", type: String(describing: UniqueNode.self))
}
let node = Node(proto.node)
let nid = NodeID(proto.nid)
self.init(node: node, nid: nid)
}
}

extension ProtoUniqueNode {
init(_ node: UniqueNode) {
self.node = ProtoNode(node.node)
self.nid = node.nid.value
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: ProtoNode

extension Node {
init(_ proto: ProtoNode) {
self.protocol = proto.protocol
self.systemName = proto.system
self.host = proto.hostname
self.port = Int(proto.port)
}
}

extension ProtoNode {
init(_ node: Node) {
self.protocol = node.protocol
self.system = node.systemName
self.hostname = node.host
self.port = UInt32(node.port)
}
}
Loading