Skip to content

Commit

Permalink
CRDT Serialization
Browse files Browse the repository at this point in the history
Motivation:
For direct replication (#27) we need to send CRDTs to remote replicators, therefore they need to be serializable.

Modifications:
- Define protobufs for CRDTs and conform to `InternalProtobufRepresentable`.
- Define protobufs for `CRDT.Replicator.RemoteCommand`.
- Special handling in `Serialization` for CRDT serializers.

Result:
Be able to send `CRDT.Replicator.RemoteCommand` message to remote replicators.
  • Loading branch information
yim-lee authored and ktoso committed Sep 6, 2019
1 parent 0b4e230 commit 2fef7d3
Show file tree
Hide file tree
Showing 19 changed files with 2,036 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,24 @@ isolated.run(on: .master) {
multiplier: 1.5,
randomFactor: 0
)
)
)
))
}

try isolated.run(on: .servant) {
isolated.system.log.info("ISOLATED RUNNING: \(CommandLine.arguments)")

_ = try isolated.system.spawn("failed", of: String.self,
props: Props().supervision(strategy: .escalate),
.setup { context in
context.log.info("Spawned \(context.path) on servant node it will fail soon...")
context.timers.startSingle(key: "explode", message: "Boom", delay: .seconds(1))
props: Props().supervision(strategy: .escalate),
.setup { context in
context.log.info("Spawned \(context.path) on servant node it will fail soon...")
context.timers.startSingle(key: "explode", message: "Boom", delay: .seconds(1))

return .receiveMessage { message in
context.log.error("Time to crash with: fatalError")
// crashes process since we do not isolate faults
fatalError("FATAL ERROR ON PURPOSE")
}
})
return .receiveMessage { _ in
context.log.error("Time to crash with: fatalError")
// crashes process since we do not isolate faults
fatalError("FATAL ERROR ON PURPOSE")
}
})
}

// finally, once prepared, you have to invoke the following:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,26 @@ try isolated.run(on: .servant) {
// TODO: assert command line arguments are the expected ones

_ = try isolated.system.spawn("failed", of: String.self,
props: Props().supervision(strategy: .escalate),
.setup { context in
context.log.info("Spawned \(context.path) on servant node it will fail soon...")
context.timers.startSingle(key: "explode", message: "Boom", delay: .seconds(1))
props: Props().supervision(strategy: .escalate),
.setup { context in
context.log.info("Spawned \(context.path) on servant node it will fail soon...")
context.timers.startSingle(key: "explode", message: "Boom", delay: .seconds(1))

return .receiveMessage { message in
if CommandLine.arguments.contains("fatalError") {
context.log.error("Time to crash with: fatalError")
// crashes process since we do not isolate faults
fatalError("FATAL ERROR ON PURPOSE")
} else if CommandLine.arguments.contains("escalateError") {
context.log.error("Time to crash with: throwing an error, escalated to top level")
// since we .escalate and are a top-level actor, this will cause the process to die as well
throw OnPurposeBoom()
} else {
context.log.error("MISSING FAILURE MODE ARGUMENT!!! Test is constructed not properly, or arguments were not passed properly. \(CommandLine.arguments)")
fatalError("MISSING FAILURE MODE ARGUMENT!!! Test is constructed not properly, or arguments were not passed properly. \(CommandLine.arguments)")
}
}
})
return .receiveMessage { _ in
if CommandLine.arguments.contains("fatalError") {
context.log.error("Time to crash with: fatalError")
// crashes process since we do not isolate faults
fatalError("FATAL ERROR ON PURPOSE")
} else if CommandLine.arguments.contains("escalateError") {
context.log.error("Time to crash with: throwing an error, escalated to top level")
// since we .escalate and are a top-level actor, this will cause the process to die as well
throw OnPurposeBoom()
} else {
context.log.error("MISSING FAILURE MODE ARGUMENT!!! Test is constructed not properly, or arguments were not passed properly. \(CommandLine.arguments)")
fatalError("MISSING FAILURE MODE ARGUMENT!!! Test is constructed not properly, or arguments were not passed properly. \(CommandLine.arguments)")
}
}
})
}

// finally, once prepared, you have to invoke the following:
Expand Down
57 changes: 57 additions & 0 deletions Protos/CRDT/CRDT.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2019 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

syntax = "proto3";

option optimize_for = SPEED;
option swift_prefix = "Proto";

import "ActorAddress.proto";

// TODO: consider making this file or part of it public since it contains proto types that can be used for custom CRDTs

// ===== CRDT.Identity =====

message CRDTIdentity {
string id = 1;
}

// ===== CRDT.ReplicaId =====

message CRDTReplicaId {
oneof value {
CRDTReplicaId_ActorAddress actorAddress = 1;
}
}

message CRDTReplicaId_ActorAddress {
ActorAddress actorAddress = 1;
}

// ===== CRDT.GCounter =====

message CRDTGCounter {
message ReplicaState {
CRDTReplicaId replicaId = 1;
uint64 count = 2;
}

message Delta {
repeated ReplicaState state = 1;
}

CRDTReplicaId replicaId = 1;
// Not a map because only integral or string type can be keys
repeated ReplicaState state = 2;
}
28 changes: 28 additions & 0 deletions Protos/CRDT/CRDTAny.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2019 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

syntax = "proto3";

option optimize_for = SPEED;
option swift_prefix = "Proto";

message AnyCvRDT {
uint32 underlyingSerializerId = 1;
bytes underlyingBytes = 2;
}

message AnyDeltaCRDT {
uint32 underlyingSerializerId = 1;
bytes underlyingBytes = 2;
}
69 changes: 69 additions & 0 deletions Protos/CRDT/CRDTReplication.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2019 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

syntax = "proto3";

option optimize_for = SPEED;
option swift_prefix = "Proto";

import "ActorAddress.proto";
import "CRDT/CRDT.proto";
import "CRDT/CRDTAny.proto";

// ===== CRDT.Replicator.Message =====

message CRDTReplicatorMessage {
oneof value {
CRDTWrite write = 1;
}
}

// ===== CRDT.Replicator.RemoteCommand.write =====

message CRDTWrite {
CRDTIdentity identity = 1;
ReplicatedData data = 2;
ActorAddress replyTo = 3;
}

message CRDTWriteResult {
enum Type {
SUCCESS = 0;
FAILED = 1;
}

Type type = 1;
CRDTWriteError error = 2;
}

message CRDTWriteError {
enum Type {
MISSING_CRDT_FOR_DELTA = 0;
INCORRECT_DELTA_TYPE = 1;
CANNOT_WRITE_DELTA_FOR_NON_DELTA_CRDT = 2;
INPUT_AND_STORED_DATA_TYPE_MISMATCH = 3;
UNSUPPORTED_CRDT = 4;
}

Type type = 1;
}

// ===== Common =====

message ReplicatedData {
oneof payload {
AnyCvRDT cvrdt = 1;
AnyDeltaCRDT deltaCrdt = 2;
}
}
36 changes: 18 additions & 18 deletions Sources/DistributedActors/CRDT/CRDT+Any.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ internal struct AnyCvRDT: CvRDT, AnyStateBasedCRDT {
var underlying: StateBasedCRDT
let _merge: (StateBasedCRDT, StateBasedCRDT) -> StateBasedCRDT

init<DataType: CvRDT>(_ data: DataType) {
self.metaType = MetaType(DataType.self)
init<T: CvRDT>(_ data: T) {
self.metaType = MetaType(T.self)
self.underlying = data
self._merge = AnyCvRDT._merge(DataType.self)
self._merge = AnyCvRDT._merge(T.self)
}
}

Expand Down Expand Up @@ -104,30 +104,30 @@ internal struct AnyDeltaCRDT: DeltaCRDT, AnyStateBasedCRDT {
return self._delta(self.underlying)
}

init<DataType: DeltaCRDT>(_ data: DataType) {
self.metaType = MetaType(DataType.self)
init<T: DeltaCRDT>(_ data: T) {
self.metaType = MetaType(T.self)
self.underlying = data
self._merge = AnyDeltaCRDT._merge(DataType.self)
self._merge = AnyDeltaCRDT._merge(T.self)

self.deltaMetaType = MetaType(DataType.Delta.self)
self._delta = { dt in
let dt: DataType = dt as! DataType // as! safe, since `dt` should be `self.underlying`
switch dt.delta {
self.deltaMetaType = MetaType(T.Delta.self)
self._delta = { data in
let data: T = data as! T // as! safe, since `data` should be `self.underlying`
switch data.delta {
case .none:
return nil
case .some(let d):
return d.asAnyCvRDT
}
}
self._mergeDelta = { dt, d in
let dt = dt as! DataType // as! safe, since `dt` should be `self.underlying`
let d: DataType.Delta = d.underlying as! DataType.Delta // as! safe, since invoking _mergeDelta is protected by checking the `deltaMetaType`
return dt.mergingDelta(d)
self._mergeDelta = { data, delta in
let data = data as! T // as! safe, since `data` should be `self.underlying`
let delta: T.Delta = delta.underlying as! T.Delta // as! safe, since invoking _mergeDelta is protected by checking the `deltaMetaType`
return data.mergingDelta(delta)
}
self._resetDelta = { dt in
var dt: DataType = dt as! DataType // as! safe, since `dt` should be `self.underlying`
dt.resetDelta()
return dt
self._resetDelta = { data in
var data: T = data as! T // as! safe, since `data` should be `self.underlying`
data.resetDelta()
return data
}
}

Expand Down
Loading

0 comments on commit 2fef7d3

Please sign in to comment.