Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Protos/CRDT/CRDT.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,15 @@ message CRDTORMap {
message Delta {
CRDTVersionedContainerDelta keys = 1;
repeated CRDTORMapKeyValue values = 2;
CRDTORMapValue defaultValue = 3;
}

VersionReplicaID replicaID = 1;
CRDTORSet keys = 2;
repeated CRDTORMapKeyValue values = 3;
// Delta is derived from `updatedValues`
repeated CRDTORMapKeyValue updatedValues = 4;
CRDTORMapValue defaultValue = 5;
}

// ***** CRDT.ORMultiMap *****
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ extension CRDT.ORMap: ProtobufRepresentable {
proto.keys = try self._keys.toProto(context: context)
proto.values = try ORMapSerializationUtils.valuesToProto(self._storage, context: context)
proto.updatedValues = try ORMapSerializationUtils.valuesToProto(self.updatedValues, context: context)
proto.defaultValue = try ORMapSerializationUtils.valueToProto(self.defaultValue, context: context)
return proto
}

Expand All @@ -335,7 +336,7 @@ extension CRDT.ORMap: ProtobufRepresentable {

self._storage = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context)
self.updatedValues = try ORMapSerializationUtils.valuesFromProto(proto.updatedValues, context: context)
self.defaultValue = nil // We don't need remote's default value for merge
self.defaultValue = try ORMapSerializationUtils.valueFromProto(proto.defaultValue, context: context)
}
}

Expand All @@ -346,6 +347,7 @@ extension CRDT.ORMapDelta: ProtobufRepresentable {
var proto = ProtobufRepresentation()
proto.keys = try self.keys.toProto(context: context)
proto.values = try ORMapSerializationUtils.valuesToProto(self.values, context: context)
proto.defaultValue = try ORMapSerializationUtils.valueToProto(self.defaultValue, context: context)
return proto
}

Expand All @@ -356,7 +358,7 @@ extension CRDT.ORMapDelta: ProtobufRepresentable {
self.keys = try CRDT.ORSet<Key>.Delta(fromProto: proto.keys, context: context)

self.values = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context)
self.defaultValue = nil // We don't need remote's default value for merge
self.defaultValue = try ORMapSerializationUtils.valueFromProto(proto.defaultValue, context: context)
}
}

Expand Down
34 changes: 34 additions & 0 deletions Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,15 @@ public struct ProtoCRDTORMap {
set {_uniqueStorage()._updatedValues = newValue}
}

public var defaultValue: ProtoCRDTORMapValue {
get {return _storage._defaultValue ?? ProtoCRDTORMapValue()}
set {_uniqueStorage()._defaultValue = newValue}
}
/// Returns true if `defaultValue` has been explicitly set.
public var hasDefaultValue: Bool {return _storage._defaultValue != nil}
/// Clears the value of `defaultValue`. Subsequent reads from it will return its default value.
public mutating func clearDefaultValue() {_uniqueStorage()._defaultValue = nil}

public var unknownFields = SwiftProtobuf.UnknownStorage()

public struct Delta {
Expand All @@ -383,6 +392,15 @@ public struct ProtoCRDTORMap {
set {_uniqueStorage()._values = newValue}
}

public var defaultValue: ProtoCRDTORMapValue {
get {return _storage._defaultValue ?? ProtoCRDTORMapValue()}
set {_uniqueStorage()._defaultValue = newValue}
}
/// Returns true if `defaultValue` has been explicitly set.
public var hasDefaultValue: Bool {return _storage._defaultValue != nil}
/// Clears the value of `defaultValue`. Subsequent reads from it will return its default value.
public mutating func clearDefaultValue() {_uniqueStorage()._defaultValue = nil}

public var unknownFields = SwiftProtobuf.UnknownStorage()

public init() {}
Expand Down Expand Up @@ -1278,13 +1296,15 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
2: .same(proto: "keys"),
3: .same(proto: "values"),
4: .same(proto: "updatedValues"),
5: .same(proto: "defaultValue"),
]

fileprivate class _StorageClass {
var _replicaID: ProtoVersionReplicaID? = nil
var _keys: ProtoCRDTORSet? = nil
var _values: [ProtoCRDTORMapKeyValue] = []
var _updatedValues: [ProtoCRDTORMapKeyValue] = []
var _defaultValue: ProtoCRDTORMapValue? = nil

static let defaultInstance = _StorageClass()

Expand All @@ -1295,6 +1315,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
_keys = source._keys
_values = source._values
_updatedValues = source._updatedValues
_defaultValue = source._defaultValue
}
}

Expand All @@ -1314,6 +1335,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
case 2: try decoder.decodeSingularMessageField(value: &_storage._keys)
case 3: try decoder.decodeRepeatedMessageField(value: &_storage._values)
case 4: try decoder.decodeRepeatedMessageField(value: &_storage._updatedValues)
case 5: try decoder.decodeSingularMessageField(value: &_storage._defaultValue)
default: break
}
}
Expand All @@ -1334,6 +1356,9 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
if !_storage._updatedValues.isEmpty {
try visitor.visitRepeatedMessageField(value: _storage._updatedValues, fieldNumber: 4)
}
if let v = _storage._defaultValue {
try visitor.visitSingularMessageField(value: v, fieldNumber: 5)
}
}
try unknownFields.traverse(visitor: &visitor)
}
Expand All @@ -1347,6 +1372,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement
if _storage._keys != rhs_storage._keys {return false}
if _storage._values != rhs_storage._values {return false}
if _storage._updatedValues != rhs_storage._updatedValues {return false}
if _storage._defaultValue != rhs_storage._defaultValue {return false}
return true
}
if !storagesAreEqual {return false}
Expand All @@ -1361,11 +1387,13 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp
public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [
1: .same(proto: "keys"),
2: .same(proto: "values"),
3: .same(proto: "defaultValue"),
]

fileprivate class _StorageClass {
var _keys: ProtoCRDTVersionedContainerDelta? = nil
var _values: [ProtoCRDTORMapKeyValue] = []
var _defaultValue: ProtoCRDTORMapValue? = nil

static let defaultInstance = _StorageClass()

Expand All @@ -1374,6 +1402,7 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp
init(copying source: _StorageClass) {
_keys = source._keys
_values = source._values
_defaultValue = source._defaultValue
}
}

Expand All @@ -1391,6 +1420,7 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp
switch fieldNumber {
case 1: try decoder.decodeSingularMessageField(value: &_storage._keys)
case 2: try decoder.decodeRepeatedMessageField(value: &_storage._values)
case 3: try decoder.decodeSingularMessageField(value: &_storage._defaultValue)
default: break
}
}
Expand All @@ -1405,6 +1435,9 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp
if !_storage._values.isEmpty {
try visitor.visitRepeatedMessageField(value: _storage._values, fieldNumber: 2)
}
if let v = _storage._defaultValue {
try visitor.visitSingularMessageField(value: v, fieldNumber: 3)
}
}
try unknownFields.traverse(visitor: &visitor)
}
Expand All @@ -1416,6 +1449,7 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp
let rhs_storage = _args.1
if _storage._keys != rhs_storage._keys {return false}
if _storage._values != rhs_storage._values {return false}
if _storage._defaultValue != rhs_storage._defaultValue {return false}
return true
}
if !storagesAreEqual {return false}
Expand Down
53 changes: 12 additions & 41 deletions Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,9 @@ extension CRDT {

public let replicaID: ReplicaID

/// We allow `defaultValue` to be `nil` when we reconstruct `ORMap` from a remote message,
/// but it **is** required in the initializer to ensure that **local** `ORMap` has `defaultValue`
/// for `merge`, `update`, etc. In those methods `defaultValue` is required in case **local**
/// The default value for `merge`, `update`, etc. in case **local**
/// `ORMap` does not have an existing value for the given `key`.
let defaultValue: Value?
let defaultValue: Value

/// ORSet to maintain causal history of the keys only; values keep their own causal history (if applicable).
/// This is for tracking key additions and removals.
Expand Down Expand Up @@ -110,15 +108,11 @@ extension CRDT {
}

public mutating func update(key: Key, mutator: (inout Value) -> Void) {
guard let defaultValue = self.defaultValue else {
preconditionFailure("'defaultValue' is not set. This is a bug. Please report.")
}

// Always add `key` to `_keys` set to track its causal history
self._keys.insert(key)

// Apply `mutator` to the value then save it to state. Create `Value` if needed.
var value = self._storage[key] ?? defaultValue
var value = self._storage[key] ?? self.defaultValue
mutator(&value)
self._storage[key] = value

Expand Down Expand Up @@ -161,25 +155,17 @@ extension CRDT {
}

public mutating func merge(other: ORMap<Key, Value>) {
guard let defaultValue = self.defaultValue else {
preconditionFailure("'defaultValue' is not set. This is a bug. Please report.")
}

self._keys.merge(other: other._keys)
// Use the updated `_keys` to merge `_values` dictionaries.
// Keys that no longer exist will have their values deleted as well.
self._storage.merge(keys: self._keys.elements, other: other._storage, defaultValue: defaultValue)
self._storage.merge(keys: self._keys.elements, other: other._storage, defaultValue: self.defaultValue)
}

public mutating func mergeDelta(_ delta: Delta) {
guard let defaultValue = self.defaultValue else {
preconditionFailure("'defaultValue' is not set. This is a bug. Please report.")
}

self._keys.mergeDelta(delta.keys)
// Use the updated `_keys` to merge `_values` dictionaries.
// Keys that no longer exist will have their values deleted as well.
self._storage.merge(keys: self._keys.elements, other: delta.values, defaultValue: defaultValue)
self._storage.merge(keys: self._keys.elements, other: delta.values, defaultValue: self.defaultValue)
}

public mutating func resetDelta() {
Expand All @@ -192,13 +178,8 @@ extension CRDT {
return false
}

switch (self.defaultValue, other.defaultValue) {
case (nil, nil):
() // continue checking
case (.some(let lhs), .some(let rhs)) where lhs.equalState(to: rhs):
() // continue checking
default:
return false // values not equal
guard self.defaultValue.equalState(to: other.defaultValue) else {
return false
}

guard self._storage.count == other._storage.count else {
Expand Down Expand Up @@ -230,10 +211,9 @@ extension CRDT {
// TODO: `merge` defined in the Dictionary extension below should use `mergeDelta` when Value is DeltaCRDT
var values: [Key: Value]

// See comment in `ORMap` on why this is optional
let defaultValue: Value?
let defaultValue: Value

init(keys: ORSet<Key>.Delta, values: [Key: Value], defaultValue: Value?) {
init(keys: ORSet<Key>.Delta, values: [Key: Value], defaultValue: Value) {
self.keys = keys
self.values = values
self.defaultValue = defaultValue
Expand All @@ -250,29 +230,20 @@ extension CRDT {
}

public mutating func merge(other: ORMapDelta<Key, Value>) {
guard let defaultValue = self.defaultValue else {
preconditionFailure("Unable to merge [\(self)] with [\(other)] as 'defaultValue' is not set. This is a bug. Please report.")
}

// Merge `keys` first--keys that have been deleted will be gone
self.keys.merge(other: other.keys)
// Use the updated `keys` to merge `values` dictionaries.
// Keys that no longer exist will have their values deleted as well.
self.values.merge(keys: self.keys.elements, other: other.values, defaultValue: defaultValue)
self.values.merge(keys: self.keys.elements, other: other.values, defaultValue: self.defaultValue)
}

public func equalState(to other: StateBasedCRDT) -> Bool {
guard let other = other as? Self else {
return false
}

switch (self.defaultValue, other.defaultValue) {
case (nil, nil):
() // continue checking
case (.some(let lhs), .some(let rhs)) where lhs.equalState(to: rhs):
() // continue checking
default:
return false // values not equal
guard self.defaultValue.equalState(to: other.defaultValue) else {
return false
}

guard self.values.count == other.values.count else {
Expand Down
Loading