Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CRDT Serialization #92

Merged
merged 6 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ isolated.run(on: .master) {
multiplier: 1.5,
randomFactor: 0
)
)
)
))
}

try isolated.run(on: .servant) {
isolated.system.log.info("ISOLATED RUNNING: \(CommandLine.arguments)")

// swiftformat:disable indent unusedArguments wrapArguments
_ = try isolated.system.spawn("failed", of: String.self,
props: Props().supervision(strategy: .escalate),
.setup { context in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ try isolated.run(on: .servant) {

// TODO: assert command line arguments are the expected ones

// swiftformat:disable indent unusedArguments wrapArguments
_ = try isolated.system.spawn("failed", of: String.self,
props: Props().supervision(strategy: .escalate),
.setup { context in
Expand Down
2 changes: 0 additions & 2 deletions Protos/ActorAddress.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ syntax = "proto3";
option optimize_for = SPEED;
option swift_prefix = "Proto";

// TODO: how do we distribute public .proto such as this so that framework users can import it in their .proto?

message ActorAddress {
UniqueNode node = 1; // TODO oneof { senderNode | recipientNode | node }
ActorPath path = 2;
Expand Down
74 changes: 74 additions & 0 deletions Protos/CRDT/CRDT.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2019 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

syntax = "proto3";

option optimize_for = SPEED;
option swift_prefix = "Proto";

import "Clocks/VersionVector.proto";

// ==== Common and Core CRDT parts -------------------------------------------------------------------------------------

// ===== CRDT.Identity =====

message CRDTIdentity {
string id = 1;
}

// ===== Versioned Container =====

message CRDTVersionedContainer {
VersionReplicaId replicaId = 1;
CRDTVersionContext versionContext = 2;
repeated VersionDottedElementEnvelope elementByBirthDot = 3;
CRDTVersionedContainerDelta delta = 4;
}

message CRDTVersionContext {
VersionVector versionVector = 1;
repeated VersionDot gaps = 2;
}

message CRDTVersionedContainerDelta {
CRDTVersionContext versionContext = 1;
repeated VersionDottedElementEnvelope elementByBirthDot = 2;
}

// ==== CRDT types -----------------------------------------------------------------------------------------------------

// ===== CRDT.GCounter =====

message CRDTGCounter {
message ReplicaState {
VersionReplicaId replicaId = 1;
uint64 count = 2;
}

message Delta {
repeated ReplicaState state = 1;
}

VersionReplicaId replicaId = 1;
// Not a map because only integral or string type can be keys
repeated ReplicaState state = 2;
Delta delta = 3;
}

// ===== CRDT.ORSet =====

message CRDTORSet {
VersionReplicaId replicaId = 1;
CRDTVersionedContainer state = 2;
}
yim-lee marked this conversation as resolved.
Show resolved Hide resolved
76 changes: 76 additions & 0 deletions Protos/CRDT/CRDTReplication.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2019 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

syntax = "proto3";

option optimize_for = SPEED;
option swift_prefix = "Proto";

import "ActorAddress.proto";
import "CRDT/CRDT.proto";

// ==== Wire envelope for CRDT types -----------------------------------------------------------------------------------

message CRDTEnvelope {
enum Boxed {
/// Box as `AnyCvRDT` when deserializing
ANY_CVRDT = 0;
/// Box as `AnyDeltaCRDT` when deserializing
ANY_DELTA_CRDT = 1;
}
ktoso marked this conversation as resolved.
Show resolved Hide resolved

uint32 serializerId = 1;
bytes payload = 2;
Boxed boxed = 3;
}

// ==== CRDT Direct Replication ----------------------------------------------------------------------------------------

// ===== CRDT.Replicator.Message =====

message CRDTReplicatorMessage {
oneof value {
CRDTWrite write = 1;
}
}

// ===== CRDT.Replicator.RemoteCommand.write =====

message CRDTWrite {
CRDTIdentity identity = 1;
CRDTEnvelope envelope = 2;
ActorAddress replyTo = 3;
}

message CRDTWriteResult {
enum Type {
SUCCESS = 0;
FAILED = 1;
}

Type type = 1;
CRDTWriteError error = 2;
}

message CRDTWriteError {
enum Type {
MISSING_CRDT_FOR_DELTA = 0;
INCORRECT_DELTA_TYPE = 1;
CANNOT_WRITE_DELTA_FOR_NON_DELTA_CRDT = 2;
INPUT_AND_STORED_DATA_TYPE_MISMATCH = 3;
UNSUPPORTED_CRDT = 4;
}

Type type = 1;
}
50 changes: 50 additions & 0 deletions Protos/Clocks/VersionVector.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2019 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//


syntax = "proto3";

option optimize_for = SPEED;
option swift_prefix = "Proto";

import "ActorAddress.proto";

message ReplicaVersion {
VersionReplicaId replicaId = 1;
uint32 version = 2;
}

message VersionVector {
// not a map since we cannot use `replicaId` as key
repeated ReplicaVersion state = 1;
}

message VersionDottedElementEnvelope {
VersionDot dot = 1;

// ~~ element ~~
uint32 serializerId = 2;
bytes payload = 3;
}

message VersionDot {
VersionReplicaId replicaId = 1;
uint64 version = 2;
}

message VersionReplicaId {
oneof value {
ActorAddress actorAddress = 1;
}
}
2 changes: 1 addition & 1 deletion Sources/DistributedActors/ActorAddress.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public struct ActorAddress: Equatable, Hashable {
public var node: UniqueNode? {
switch self._location {
case .local:
return nil
return nil // TODO: we could make it such that we return the owning address :thinking:
case .remote(let remote):
return remote
}
Expand Down
37 changes: 19 additions & 18 deletions Sources/DistributedActors/CRDT/CRDT+Any.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

/// Protocol adopted by any CRDT type, including their delta types
internal protocol AnyStateBasedCRDT {
var metaType: AnyMetaType { get }
var underlying: StateBasedCRDT { get set }
Expand Down Expand Up @@ -69,10 +70,10 @@ internal struct AnyCvRDT: CvRDT, AnyStateBasedCRDT {
var underlying: StateBasedCRDT
let _merge: (StateBasedCRDT, StateBasedCRDT) -> StateBasedCRDT

init<DataType: CvRDT>(_ data: DataType) {
self.metaType = MetaType(DataType.self)
init<T: CvRDT>(_ data: T) {
self.metaType = MetaType(T.self)
self.underlying = data
self._merge = AnyCvRDT._merge(DataType.self)
self._merge = AnyCvRDT._merge(T.self)
}
}

Expand Down Expand Up @@ -104,30 +105,30 @@ internal struct AnyDeltaCRDT: DeltaCRDT, AnyStateBasedCRDT {
return self._delta(self.underlying)
}

init<DataType: DeltaCRDT>(_ data: DataType) {
self.metaType = MetaType(DataType.self)
init<T: DeltaCRDT>(_ data: T) {
self.metaType = MetaType(T.self)
self.underlying = data
self._merge = AnyDeltaCRDT._merge(DataType.self)
self._merge = AnyDeltaCRDT._merge(T.self)

self.deltaMetaType = MetaType(DataType.Delta.self)
self._delta = { dt in
let dt: DataType = dt as! DataType // as! safe, since `dt` should be `self.underlying`
switch dt.delta {
self.deltaMetaType = MetaType(T.Delta.self)
self._delta = { data in
let data: T = data as! T // as! safe, since `data` should be `self.underlying`
switch data.delta {
case .none:
return nil
case .some(let d):
return d.asAnyCvRDT
}
}
self._mergeDelta = { dt, d in
let dt = dt as! DataType // as! safe, since `dt` should be `self.underlying`
let d: DataType.Delta = d.underlying as! DataType.Delta // as! safe, since invoking _mergeDelta is protected by checking the `deltaMetaType`
return dt.mergingDelta(d)
self._mergeDelta = { data, delta in
let data = data as! T // as! safe, since `data` should be `self.underlying`
let delta: T.Delta = delta.underlying as! T.Delta // as! safe, since invoking _mergeDelta is protected by checking the `deltaMetaType`
return data.mergingDelta(delta)
}
self._resetDelta = { dt in
var dt: DataType = dt as! DataType // as! safe, since `dt` should be `self.underlying`
dt.resetDelta()
return dt
self._resetDelta = { data in
var data: T = data as! T // as! safe, since `data` should be `self.underlying`
data.resetDelta()
return data
}
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedActors/CRDT/CRDT+ORSet.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ extension CRDT {
/// - SeeAlso: [Optimizing state-based CRDTs (part 2)](https://bartoszsypytkowski.com/optimizing-state-based-crdts-part-2/)
/// - SeeAlso: [A comprehensive study of CRDTs](https://hal.inria.fr/file/index/docid/555588/filename/techreport.pdf)
public struct ORSet<Element: Hashable>: NamedDeltaCRDT, ORSetOperations {
public typealias ORSetDelta = VersionedContainer<Element>.Delta
public typealias ORSetDelta = VersionedContainerDelta<Element>
public typealias Delta = ORSetDelta

public let replicaId: ReplicaId
Expand Down Expand Up @@ -104,7 +104,7 @@ extension CRDT {
// Birth dots of duplicate elements within a replica.
// e.g., suppose `elementByBirthDot` contains [(A,1): 3, (A,2): 5, (A,3): 3], then (A,1) would be added
// to this because it contains the same element (i.e., 3) as (A,3) and is older, so it can be deleted.
var birthDotsToDelete: Set<Dot<ReplicaId>> = []
var birthDotsToDelete: Set<VersionDot> = []

for birthDot in sortedBirthDots.dropFirst() {
// Replica changed - reset
Expand Down
12 changes: 5 additions & 7 deletions Sources/DistributedActors/CRDT/CRDT+Replication.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ extension CRDT {
internal enum Replicator {
static let naming: ActorNaming = "replicator"

typealias ReplicatedData = CRDT.Replication.Data

enum Message {
// The API for CRDT instance owner (e.g., actor) to call local replicator
case localCommand(LocalCommand)
Expand All @@ -53,11 +51,11 @@ extension CRDT {

enum LocalCommand: NoSerializationVerification {
// Register owner for CRDT instance
case register(ownerRef: ActorRef<CRDT.Replication.DataOwnerMessage>, id: Identity, data: ReplicatedData, replyTo: ActorRef<RegisterResult>?)
case register(ownerRef: ActorRef<CRDT.Replication.DataOwnerMessage>, id: Identity, data: AnyStateBasedCRDT, replyTo: ActorRef<RegisterResult>?)

// Perform write to at least `consistency` members
// `data` is expected to be the full CRDT. Do not send delta even if it is a delta-CRDT.
case write(_ id: Identity, _ data: ReplicatedData, consistency: OperationConsistency, replyTo: ActorRef<WriteResult>)
case write(_ id: Identity, _ data: AnyStateBasedCRDT, consistency: OperationConsistency, replyTo: ActorRef<WriteResult>)
// Perform read from at least `consistency` members
case read(_ id: Identity, consistency: OperationConsistency, replyTo: ActorRef<ReadResult>)
// Perform delete to at least `consistency` members
Expand Down Expand Up @@ -103,9 +101,9 @@ extension CRDT {

enum RemoteCommand {
// Sent from one replicator to another to write the given CRDT instance as part of `OwnerCommand.write` to meet consistency requirement
case write(_ id: Identity, _ data: ReplicatedData, replyTo: ActorRef<WriteResult>)
case write(_ id: Identity, _ data: AnyStateBasedCRDT, replyTo: ActorRef<WriteResult>)
// Sent from one replicator to another to write the given delta of delta-CRDT instance as part of `OwnerCommand.write` to meet consistency requirement
case writeDelta(_ id: Identity, delta: ReplicatedData, replyTo: ActorRef<WriteResult>)
case writeDelta(_ id: Identity, delta: AnyStateBasedCRDT, replyTo: ActorRef<WriteResult>)
// Sent from one replicator to another to read CRDT instance with the given identity as part of `OwnerCommand.read` to meet consistency requirement
case read(_ id: Identity, replyTo: ActorRef<ReadResult>)
// Sent from one replicator to another to delete CRDT instance with the given identity as part of `OwnerCommand.delete` to meet consistency requirement
Expand All @@ -125,7 +123,7 @@ extension CRDT {
}

enum ReadResult {
case success(ReplicatedData)
case success(AnyStateBasedCRDT)
case failed(ReadError)
}

Expand Down
Loading