Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Samples/swift-distributed-actors-samples.xcodeproj

/.build
/Samples/.build
Instruments/ActorInstruments/build/
/.SourceKitten
/Packages
.xcode
Expand Down
2 changes: 2 additions & 0 deletions Protos/CRDT/CRDT.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,15 @@ message CRDTORMap {
message Delta {
CRDTVersionedContainerDelta keys = 1;
repeated CRDTORMapKeyValue values = 2;
CRDTORMapValue defaultValue = 3;
}

VersionReplicaID replicaID = 1;
CRDTORSet keys = 2;
repeated CRDTORMapKeyValue values = 3;
// Delta is derived from `updatedValues`
repeated CRDTORMapKeyValue updatedValues = 4;
CRDTORMapValue defaultValue = 5;
}

// ***** CRDT.ORMultiMap *****
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ extension CRDT.ORMap: ProtobufRepresentable {
proto.keys = try self._keys.toProto(context: context)
proto.values = try ORMapSerializationUtils.valuesToProto(self._storage, context: context)
proto.updatedValues = try ORMapSerializationUtils.valuesToProto(self.updatedValues, context: context)
proto.defaultValue = try ORMapSerializationUtils.valueToProto(self.defaultValue, context: context)
return proto
}

Expand All @@ -335,7 +336,7 @@ extension CRDT.ORMap: ProtobufRepresentable {

self._storage = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context)
self.updatedValues = try ORMapSerializationUtils.valuesFromProto(proto.updatedValues, context: context)
self.defaultValue = nil // We don't need remote's default value for merge
self.defaultValue = try ORMapSerializationUtils.valueFromProto(proto.defaultValue, context: context)
}
}

Expand All @@ -346,6 +347,7 @@ extension CRDT.ORMapDelta: ProtobufRepresentable {
var proto = ProtobufRepresentation()
proto.keys = try self.keys.toProto(context: context)
proto.values = try ORMapSerializationUtils.valuesToProto(self.values, context: context)
proto.defaultValue = try ORMapSerializationUtils.valueToProto(self.defaultValue, context: context)
return proto
}

Expand All @@ -356,7 +358,7 @@ extension CRDT.ORMapDelta: ProtobufRepresentable {
self.keys = try CRDT.ORSet<Key>.Delta(fromProto: proto.keys, context: context)

self.values = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context)
self.defaultValue = nil // We don't need remote's default value for merge
self.defaultValue = try ORMapSerializationUtils.valueFromProto(proto.defaultValue, context: context)
}
}

Expand Down
34 changes: 34 additions & 0 deletions Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,15 @@ public struct ProtoCRDTORMap {
set {_uniqueStorage()._updatedValues = newValue}
}

public var defaultValue: ProtoCRDTORMapValue {
get {return _storage._defaultValue ?? ProtoCRDTORMapValue()}
set {_uniqueStorage()._defaultValue = newValue}
}
/// Returns true if `defaultValue` has been explicitly set.
public var hasDefaultValue: Bool {return _storage._defaultValue != nil}
/// Clears the value of `defaultValue`. Subsequent reads from it will return its default value.
public mutating func clearDefaultValue() {_uniqueStorage()._defaultValue = nil}

public var unknownFields = SwiftProtobuf.UnknownStorage()

public struct Delta {
Expand All @@ -383,6 +392,15 @@ public struct ProtoCRDTORMap {
set {_uniqueStorage()._values = newValue}
}

public var defaultValue: ProtoCRDTORMapValue {
get {return _storage._defaultValue ?? ProtoCRDTORMapValue()}
set {_uniqueStorage()._defaultValue = newValue}
}
/// Returns true if `defaultValue` has been explicitly set.
public var hasDefaultValue: Bool {return _storage._defaultValue != nil}
/// Clears the value of `defaultValue`. Subsequent reads from it will return its default value.
public mutating func clearDefaultValue() {_uniqueStorage()._defaultValue = nil}

public var unknownFields = SwiftProtobuf.UnknownStorage()

public init() {}
Expand Down Expand Up @@ -1278,13 +1296,15 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
2: .same(proto: "keys"),
3: .same(proto: "values"),
4: .same(proto: "updatedValues"),
5: .same(proto: "defaultValue"),
]

fileprivate class _StorageClass {
var _replicaID: ProtoVersionReplicaID? = nil
var _keys: ProtoCRDTORSet? = nil
var _values: [ProtoCRDTORMapKeyValue] = []
var _updatedValues: [ProtoCRDTORMapKeyValue] = []
var _defaultValue: ProtoCRDTORMapValue? = nil

static let defaultInstance = _StorageClass()

Expand All @@ -1295,6 +1315,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
_keys = source._keys
_values = source._values
_updatedValues = source._updatedValues
_defaultValue = source._defaultValue
}
}

Expand All @@ -1314,6 +1335,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
case 2: try decoder.decodeSingularMessageField(value: &_storage._keys)
case 3: try decoder.decodeRepeatedMessageField(value: &_storage._values)
case 4: try decoder.decodeRepeatedMessageField(value: &_storage._updatedValues)
case 5: try decoder.decodeSingularMessageField(value: &_storage._defaultValue)
default: break
}
}
Expand All @@ -1334,6 +1356,9 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
if !_storage._updatedValues.isEmpty {
try visitor.visitRepeatedMessageField(value: _storage._updatedValues, fieldNumber: 4)
}
if let v = _storage._defaultValue {
try visitor.visitSingularMessageField(value: v, fieldNumber: 5)
}
}
try unknownFields.traverse(visitor: &visitor)
}
Expand All @@ -1347,6 +1372,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
if _storage._keys != rhs_storage._keys {return false}
if _storage._values != rhs_storage._values {return false}
if _storage._updatedValues != rhs_storage._updatedValues {return false}
if _storage._defaultValue != rhs_storage._defaultValue {return false}
return true
}
if !storagesAreEqual {return false}
Expand All @@ -1361,11 +1387,13 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp
public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [
1: .same(proto: "keys"),
2: .same(proto: "values"),
3: .same(proto: "defaultValue"),
]

fileprivate class _StorageClass {
var _keys: ProtoCRDTVersionedContainerDelta? = nil
var _values: [ProtoCRDTORMapKeyValue] = []
var _defaultValue: ProtoCRDTORMapValue? = nil

static let defaultInstance = _StorageClass()

Expand All @@ -1374,6 +1402,7 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp
init(copying source: _StorageClass) {
_keys = source._keys
_values = source._values
_defaultValue = source._defaultValue
}
}

Expand All @@ -1391,6 +1420,7 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp
switch fieldNumber {
case 1: try decoder.decodeSingularMessageField(value: &_storage._keys)
case 2: try decoder.decodeRepeatedMessageField(value: &_storage._values)
case 3: try decoder.decodeSingularMessageField(value: &_storage._defaultValue)
default: break
}
}
Expand All @@ -1405,6 +1435,9 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp
if !_storage._values.isEmpty {
try visitor.visitRepeatedMessageField(value: _storage._values, fieldNumber: 2)
}
if let v = _storage._defaultValue {
try visitor.visitSingularMessageField(value: v, fieldNumber: 3)
}
}
try unknownFields.traverse(visitor: &visitor)
}
Expand All @@ -1416,6 +1449,7 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp
let rhs_storage = _args.1
if _storage._keys != rhs_storage._keys {return false}
if _storage._values != rhs_storage._values {return false}
if _storage._defaultValue != rhs_storage._defaultValue {return false}
return true
}
if !storagesAreEqual {return false}
Expand Down
53 changes: 12 additions & 41 deletions Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,9 @@ extension CRDT {

public let replicaID: ReplicaID

/// We allow `defaultValue` to be `nil` when we reconstruct `ORMap` from a remote message,
/// but it **is** required in the initializer to ensure that **local** `ORMap` has `defaultValue`
/// for `merge`, `update`, etc. In those methods `defaultValue` is required in case **local**
/// The default value for `merge`, `update`, etc. in case **local**
/// `ORMap` does not have an existing value for the given `key`.
let defaultValue: Value?
let defaultValue: Value

/// ORSet to maintain causal history of the keys only; values keep their own causal history (if applicable).
/// This is for tracking key additions and removals.
Expand Down Expand Up @@ -110,15 +108,11 @@ extension CRDT {
}

public mutating func update(key: Key, mutator: (inout Value) -> Void) {
guard let defaultValue = self.defaultValue else {
preconditionFailure("'defaultValue' is not set. This is a bug. Please report.")
}

// Always add `key` to `_keys` set to track its causal history
self._keys.insert(key)

// Apply `mutator` to the value then save it to state. Create `Value` if needed.
var value = self._storage[key] ?? defaultValue
var value = self._storage[key] ?? self.defaultValue
mutator(&value)
self._storage[key] = value

Expand Down Expand Up @@ -161,25 +155,17 @@ extension CRDT {
}

public mutating func merge(other: ORMap<Key, Value>) {
guard let defaultValue = self.defaultValue else {
preconditionFailure("'defaultValue' is not set. This is a bug. Please report.")
}

self._keys.merge(other: other._keys)
// Use the updated `_keys` to merge `_values` dictionaries.
// Keys that no longer exist will have their values deleted as well.
self._storage.merge(keys: self._keys.elements, other: other._storage, defaultValue: defaultValue)
self._storage.merge(keys: self._keys.elements, other: other._storage, defaultValue: self.defaultValue)
}

public mutating func mergeDelta(_ delta: Delta) {
guard let defaultValue = self.defaultValue else {
preconditionFailure("'defaultValue' is not set. This is a bug. Please report.")
}

self._keys.mergeDelta(delta.keys)
// Use the updated `_keys` to merge `_values` dictionaries.
// Keys that no longer exist will have their values deleted as well.
self._storage.merge(keys: self._keys.elements, other: delta.values, defaultValue: defaultValue)
self._storage.merge(keys: self._keys.elements, other: delta.values, defaultValue: self.defaultValue)
}

public mutating func resetDelta() {
Expand All @@ -192,13 +178,8 @@ extension CRDT {
return false
}

switch (self.defaultValue, other.defaultValue) {
case (nil, nil):
() // continue checking
case (.some(let lhs), .some(let rhs)) where lhs.equalState(to: rhs):
() // continue checking
default:
return false // values not equal
guard self.defaultValue.equalState(to: other.defaultValue) else {
return false
}

guard self._storage.count == other._storage.count else {
Expand Down Expand Up @@ -230,10 +211,9 @@ extension CRDT {
// TODO: `merge` defined in the Dictionary extension below should use `mergeDelta` when Value is DeltaCRDT
var values: [Key: Value]

// See comment in `ORMap` on why this is optional
let defaultValue: Value?
let defaultValue: Value

init(keys: ORSet<Key>.Delta, values: [Key: Value], defaultValue: Value?) {
init(keys: ORSet<Key>.Delta, values: [Key: Value], defaultValue: Value) {
self.keys = keys
self.values = values
self.defaultValue = defaultValue
Expand All @@ -250,29 +230,20 @@ extension CRDT {
}

public mutating func merge(other: ORMapDelta<Key, Value>) {
guard let defaultValue = self.defaultValue else {
preconditionFailure("Unable to merge [\(self)] with [\(other)] as 'defaultValue' is not set. This is a bug. Please report.")
}

// Merge `keys` first--keys that have been deleted will be gone
self.keys.merge(other: other.keys)
// Use the updated `keys` to merge `values` dictionaries.
// Keys that no longer exist will have their values deleted as well.
self.values.merge(keys: self.keys.elements, other: other.values, defaultValue: defaultValue)
self.values.merge(keys: self.keys.elements, other: other.values, defaultValue: self.defaultValue)
}

public func equalState(to other: StateBasedCRDT) -> Bool {
guard let other = other as? Self else {
return false
}

switch (self.defaultValue, other.defaultValue) {
case (nil, nil):
() // continue checking
case (.some(let lhs), .some(let rhs)) where lhs.equalState(to: rhs):
() // continue checking
default:
return false // values not equal
guard self.defaultValue.equalState(to: other.defaultValue) else {
return false
}

guard self.values.count == other.values.count else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ extension Serialization {
case SerializerID.specializedWithTypeHint.value:
return "serializerID:specialized(\(self.value))"
case SerializerID.foundationJSON.value:
return "serializerID:jsonCodable(\(self.value))"
return "serializerID:foundationJSON(\(self.value))"
case SerializerID.foundationPropertyListBinary.value:
return "serializerID:foundationPropertyListBinary(\(self.value))"
case SerializerID.foundationPropertyListXML.value:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,35 @@ import Foundation // for Codable
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: JSON

/// Terrible workaround to automatically encode nils correctly on Swift 5.2, where Foundation encoders to not pass .allowFragments
internal protocol __OptionalProtocol {
// Terrible workaround to survive `null` values on "top level" (as perceived by the Coders, since we may call them for values in our message)
func __jsonEncode(using encoder: JSONEncoder) throws -> Data
// Terrible workaround to survive `null` values on "top level" (as perceived by the Coders, since we may call them for values in our message)
static func __jsonDecode(using decoder: JSONDecoder, from: Serialization.Buffer) throws -> Self
}

extension Optional: __OptionalProtocol where Wrapped: Codable {
func __jsonEncode(using encoder: JSONEncoder) throws -> Data {
switch self {
case .some(let value):
let data = try encoder.encode(value)
return data
case .none:
return "null".data(using: .ascii)!
}
}

static func __jsonDecode(using decoder: JSONDecoder, from bytes: Serialization.Buffer) throws -> Self {
let data = bytes.readData()
if String(data: data, encoding: .utf8) == "null" {
return nil
} else {
return try decoder.decode(Self.self, from: data)
}
}
}

/// Allows for serialization of messages using the Foundation's `JSONEncoder` and `JSONDecoder`.
///
/// - Note: Take care to ensure that both "ends" (sending and receiving members of a cluster)
Expand All @@ -35,13 +64,35 @@ internal class JSONCodableSerializer<Message: Codable>: Serializer<Message> {
}

public override func serialize(_ message: Message) throws -> Serialization.Buffer {
let data = try encoder.encode(message)
let data: Data
#if swift(>=5.3)
// It has .allowFragments set by default
data = try encoder.encode(message)
#else
// terrible hack workaround, see __OptionalProtocol for details
if let someOptional = message as? __OptionalProtocol {
data = try someOptional.__jsonEncode(using: self.encoder)
} else {
data = try self.encoder.encode(message)
}
#endif
traceLog_Serialization("serialized to: \(data)")
return .data(data)
}

public override func deserialize(from buffer: Serialization.Buffer) throws -> Message {
try self.decoder.decode(Message.self, from: buffer.readData())
let data = buffer.readData()
#if swift(>=5.3)
// It has .allowFragments set by default
return try self.decoder.decode(Message.self, from: data)
#else
// terrible hack workaround, see __OptionalProtocol for details
if let OptionalMessageType = Message.self as? __OptionalProtocol.Type {
return try OptionalMessageType.__jsonDecode(using: self.decoder, from: buffer) as! Message
} else {
return try self.decoder.decode(Message.self, from: data)
}
#endif
}

public override func setSerializationContext(_ context: Serialization.Context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import Foundation // for Codable

/// Kind of like coder / encoder, we'll provide bridges for it
// TODO: Document since users need to implement these
// TODO: could be a protocol
open class Serializer<Message> {
public init() {}

Expand Down
Loading