From cc9117aa2c62137be9bbe478cb43c3d0262f91f8 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Tue, 28 Apr 2020 15:44:51 -0700 Subject: [PATCH 1/5] CRDT.ORMap serialization #507 Motivation: `CRDT.ORMap` does not conform to `ProtobufRepresentable` Modiifications: - Modified `ORMap` so it doesn't require a closure in initializer - Conform `ORMap` and `ORMapDelta` to `ProtobufRepresentable` Result: Part of https://github.com/apple/swift-distributed-actors/issues/507 --- Protos/CRDT/CRDT.proto | 32 ++ .../CRDT/Protobuf/CRDT+Serialization.swift | 127 ++++ .../CRDT/Protobuf/CRDT.pb.swift | 544 ++++++++++++++++++ .../CRDT/Types/CRDT+GCounter.swift | 12 + .../CRDT/Types/CRDT+LWWMap.swift | 31 +- .../CRDT/Types/CRDT+LWWRegister.swift | 14 + .../CRDT/Types/CRDT+ORMap.swift | 70 ++- .../CRDT/Types/CRDT+ORMultiMap.swift | 15 +- .../CRDT/Types/CRDT+ORSet.swift | 11 + .../CRDT/Types/CRDT+StateBased.swift | 5 + .../Serialization+SerializerID.swift | 2 + .../CRDT/CRDTActorOwnedTests.swift | 2 +- .../Protobuf/CRDT+SerializationTests.swift | 67 +++ .../CRDT/Types/CRDTGCounterTests.swift | 23 +- .../CRDT/Types/CRDTLWWMapTests.swift | 29 +- .../CRDT/Types/CRDTLWWRegisterTests.swift | 25 +- .../CRDT/Types/CRDTORMapTests.swift | 53 +- .../CRDT/Types/CRDTORMultiMapTests.swift | 27 +- .../CRDT/Types/CRDTORSetTests.swift | 24 +- 19 files changed, 1017 insertions(+), 96 deletions(-) diff --git a/Protos/CRDT/CRDT.proto b/Protos/CRDT/CRDT.proto index 234099ebf..383be9401 100644 --- a/Protos/CRDT/CRDT.proto +++ b/Protos/CRDT/CRDT.proto @@ -77,6 +77,38 @@ message CRDTORSet { CRDTVersionedContainer state = 2; } +// ***** CRDT.ORMap ***** + +message CRDTORMapKey { + Manifest manifest = 1; + bytes payload = 2; +} + +message CRDTORMapValue { + Manifest manifest = 1; + bytes payload = 2; +} + +message CRDTORMapKeyValue { + CRDTORMapKey key = 1; + CRDTORMapValue value = 2; +} + +message CRDTORMap { + message Delta { + CRDTORMapValue defaultValue = 1; + CRDTVersionedContainerDelta keys = 2; + repeated CRDTORMapKeyValue values = 3; + } + + VersionReplicaID replicaID = 1; + CRDTORMapValue defaultValue = 2; + CRDTORSet keys = 3; + repeated CRDTORMapKeyValue values = 4; + // Delta is derived from `updatedValues` + repeated CRDTORMapKeyValue updatedValues = 5; +} + // ***** CRDT.LWWRegister ***** message CRDTLWWRegister { diff --git a/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift b/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift index 2cda44e21..9773f5d3b 100644 --- a/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift +++ b/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift @@ -247,6 +247,133 @@ extension CRDT.ORSet: ProtobufRepresentable { } } +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: CRDT.ORMap + +private enum ORMapSerializationUtils { + static func keyToProto(_ key: Key, context: Serialization.Context) throws -> ProtoCRDTORMapKey { + let serialized = try context.serialization.serialize(key) + var proto = ProtoCRDTORMapKey() + proto.manifest = try serialized.manifest.toProto(context: context) + proto.payload = serialized.buffer.readData() + return proto + } + + static func keyFromProto(_ proto: ProtoCRDTORMapKey, context: Serialization.Context) throws -> Key { + try context.serialization.deserialize( + as: Key.self, + from: .data(proto.payload), + using: Serialization.Manifest(fromProto: proto.manifest, context: context) + ) + } + + static func valueToProto(_ value: Value, context: Serialization.Context) throws -> ProtoCRDTORMapValue { + let serialized = try context.serialization.serialize(value) + var proto = ProtoCRDTORMapValue() + proto.manifest = try serialized.manifest.toProto(context: context) + proto.payload = serialized.buffer.readData() + return proto + } + + static func valueFromProto(_ proto: ProtoCRDTORMapValue, context: Serialization.Context) throws -> Value { + try context.serialization.deserialize( + as: Value.self, + from: .data(proto.payload), + using: Serialization.Manifest(fromProto: proto.manifest, context: context) + ) + } + + static func valuesToProto(_ values: [Key: Value], context: Serialization.Context) throws -> [ProtoCRDTORMapKeyValue] { + try values.map { key, value in + var proto = ProtoCRDTORMapKeyValue() + proto.key = try keyToProto(key, context: context) + proto.value = try valueToProto(value, context: context) + return proto + } + } + + static func valuesFromProto(_ proto: [ProtoCRDTORMapKeyValue], context: Serialization.Context) throws -> [Key: Value] { + try proto.reduce(into: [Key: Value]()) { result, protoKeyValue in + guard protoKeyValue.hasKey else { + throw SerializationError.missingField("key", type: String(describing: [Key: Value].self)) + } + let key: Key = try keyFromProto(protoKeyValue.key, context: context) + let value: Value = try valueFromProto(protoKeyValue.value, context: context) + result[key] = value + } + } +} + +extension CRDT.ORMap: ProtobufRepresentable { + public typealias ProtobufRepresentation = ProtoCRDTORMap + + public func toProto(context: Serialization.Context) throws -> ProtobufRepresentation { + var proto = ProtobufRepresentation() + proto.replicaID = try self.replicaID.toProto(context: context) + + let serializedDefaultValue = try context.serialization.serialize(self.defaultValue) + proto.defaultValue.manifest = try serializedDefaultValue.manifest.toProto(context: context) + proto.defaultValue.payload = serializedDefaultValue.buffer.readData() + + proto.keys = try self._keys.toProto(context: context) + proto.values = try ORMapSerializationUtils.valuesToProto(self._values, context: context) + proto.updatedValues = try ORMapSerializationUtils.valuesToProto(self.updatedValues, context: context) + + return proto + } + + public init(fromProto proto: ProtoCRDTORMap, context: Serialization.Context) throws { + guard proto.hasReplicaID else { + throw SerializationError.missingField("replicaID", type: String(describing: CRDT.ORMap.self)) + } + self.replicaID = try ReplicaID(fromProto: proto.replicaID, context: context) + + guard proto.hasDefaultValue else { + throw SerializationError.missingField("defaultValue", type: String(describing: CRDT.ORMap.self)) + } + self.defaultValue = try ORMapSerializationUtils.valueFromProto(proto.defaultValue, context: context) + + guard proto.hasKeys else { + throw SerializationError.missingField("keys", type: String(describing: CRDT.ORMap.self)) + } + self._keys = try CRDT.ORSet(fromProto: proto.keys, context: context) + + self._values = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context) + self.updatedValues = try ORMapSerializationUtils.valuesFromProto(proto.updatedValues, context: context) + } +} + +extension CRDT.ORMapDelta: ProtobufRepresentable { + public typealias ProtobufRepresentation = ProtoCRDTORMap.Delta + + public func toProto(context: Serialization.Context) throws -> ProtobufRepresentation { + var proto = ProtobufRepresentation() + + let serializedDefaultValue = try context.serialization.serialize(self.defaultValue) + proto.defaultValue.manifest = try serializedDefaultValue.manifest.toProto(context: context) + proto.defaultValue.payload = serializedDefaultValue.buffer.readData() + + proto.keys = try self.keys.toProto(context: context) + proto.values = try ORMapSerializationUtils.valuesToProto(self.values, context: context) + + return proto + } + + public init(fromProto proto: ProtobufRepresentation, context: Serialization.Context) throws { + guard proto.hasDefaultValue else { + throw SerializationError.missingField("defaultValue", type: String(describing: CRDT.ORMapDelta.self)) + } + self.defaultValue = try ORMapSerializationUtils.valueFromProto(proto.defaultValue, context: context) + + guard proto.hasKeys else { + throw SerializationError.missingField("keys", type: String(describing: CRDT.ORMapDelta.self)) + } + self.keys = try CRDT.ORSet.Delta(fromProto: proto.keys, context: context) + + self.values = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context) + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: CRDT.LWWRegister diff --git a/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift b/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift index 215065018..6afdcbbf2 100644 --- a/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift +++ b/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift @@ -246,6 +246,173 @@ public struct ProtoCRDTORSet { fileprivate var _storage = _StorageClass.defaultInstance } +public struct ProtoCRDTORMapKey { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + public var manifest: ProtoManifest { + get {return _storage._manifest ?? ProtoManifest()} + set {_uniqueStorage()._manifest = newValue} + } + /// Returns true if `manifest` has been explicitly set. + public var hasManifest: Bool {return _storage._manifest != nil} + /// Clears the value of `manifest`. Subsequent reads from it will return its default value. + public mutating func clearManifest() {_uniqueStorage()._manifest = nil} + + public var payload: Data { + get {return _storage._payload} + set {_uniqueStorage()._payload = newValue} + } + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + + fileprivate var _storage = _StorageClass.defaultInstance +} + +public struct ProtoCRDTORMapValue { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + public var manifest: ProtoManifest { + get {return _storage._manifest ?? ProtoManifest()} + set {_uniqueStorage()._manifest = newValue} + } + /// Returns true if `manifest` has been explicitly set. + public var hasManifest: Bool {return _storage._manifest != nil} + /// Clears the value of `manifest`. Subsequent reads from it will return its default value. + public mutating func clearManifest() {_uniqueStorage()._manifest = nil} + + public var payload: Data { + get {return _storage._payload} + set {_uniqueStorage()._payload = newValue} + } + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + + fileprivate var _storage = _StorageClass.defaultInstance +} + +public struct ProtoCRDTORMapKeyValue { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + public var key: ProtoCRDTORMapKey { + get {return _storage._key ?? ProtoCRDTORMapKey()} + set {_uniqueStorage()._key = newValue} + } + /// Returns true if `key` has been explicitly set. + public var hasKey: Bool {return _storage._key != nil} + /// Clears the value of `key`. Subsequent reads from it will return its default value. + public mutating func clearKey() {_uniqueStorage()._key = nil} + + public var value: ProtoCRDTORMapValue { + get {return _storage._value ?? ProtoCRDTORMapValue()} + set {_uniqueStorage()._value = newValue} + } + /// Returns true if `value` has been explicitly set. + public var hasValue: Bool {return _storage._value != nil} + /// Clears the value of `value`. Subsequent reads from it will return its default value. + public mutating func clearValue() {_uniqueStorage()._value = nil} + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + + fileprivate var _storage = _StorageClass.defaultInstance +} + +public struct ProtoCRDTORMap { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + public var replicaID: ProtoVersionReplicaID { + get {return _storage._replicaID ?? ProtoVersionReplicaID()} + set {_uniqueStorage()._replicaID = newValue} + } + /// Returns true if `replicaID` has been explicitly set. + public var hasReplicaID: Bool {return _storage._replicaID != nil} + /// Clears the value of `replicaID`. Subsequent reads from it will return its default value. + public mutating func clearReplicaID() {_uniqueStorage()._replicaID = nil} + + public var defaultValue: ProtoCRDTORMapValue { + get {return _storage._defaultValue ?? ProtoCRDTORMapValue()} + set {_uniqueStorage()._defaultValue = newValue} + } + /// Returns true if `defaultValue` has been explicitly set. + public var hasDefaultValue: Bool {return _storage._defaultValue != nil} + /// Clears the value of `defaultValue`. Subsequent reads from it will return its default value. + public mutating func clearDefaultValue() {_uniqueStorage()._defaultValue = nil} + + public var keys: ProtoCRDTORSet { + get {return _storage._keys ?? ProtoCRDTORSet()} + set {_uniqueStorage()._keys = newValue} + } + /// Returns true if `keys` has been explicitly set. + public var hasKeys: Bool {return _storage._keys != nil} + /// Clears the value of `keys`. Subsequent reads from it will return its default value. + public mutating func clearKeys() {_uniqueStorage()._keys = nil} + + public var values: [ProtoCRDTORMapKeyValue] { + get {return _storage._values} + set {_uniqueStorage()._values = newValue} + } + + /// Delta is derived from `updatedValues` + public var updatedValues: [ProtoCRDTORMapKeyValue] { + get {return _storage._updatedValues} + set {_uniqueStorage()._updatedValues = newValue} + } + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public struct Delta { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + public var defaultValue: ProtoCRDTORMapValue { + get {return _storage._defaultValue ?? ProtoCRDTORMapValue()} + set {_uniqueStorage()._defaultValue = newValue} + } + /// Returns true if `defaultValue` has been explicitly set. + public var hasDefaultValue: Bool {return _storage._defaultValue != nil} + /// Clears the value of `defaultValue`. Subsequent reads from it will return its default value. + public mutating func clearDefaultValue() {_uniqueStorage()._defaultValue = nil} + + public var keys: ProtoCRDTVersionedContainerDelta { + get {return _storage._keys ?? ProtoCRDTVersionedContainerDelta()} + set {_uniqueStorage()._keys = newValue} + } + /// Returns true if `keys` has been explicitly set. + public var hasKeys: Bool {return _storage._keys != nil} + /// Clears the value of `keys`. Subsequent reads from it will return its default value. + public mutating func clearKeys() {_uniqueStorage()._keys = nil} + + public var values: [ProtoCRDTORMapKeyValue] { + get {return _storage._values} + set {_uniqueStorage()._values = newValue} + } + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + + fileprivate var _storage = _StorageClass.defaultInstance + } + + public init() {} + + fileprivate var _storage = _StorageClass.defaultInstance +} + public struct ProtoCRDTLWWRegister { // SwiftProtobuf.Message conformance is added in an extension below. See the // `Message` and `Message+*Additions` files in the SwiftProtobuf library for @@ -853,6 +1020,383 @@ extension ProtoCRDTORSet: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement } } +extension ProtoCRDTORMapKey: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = "CRDTORMapKey" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "manifest"), + 2: .same(proto: "payload"), + ] + + fileprivate class _StorageClass { + var _manifest: ProtoManifest? = nil + var _payload: Data = SwiftProtobuf.Internal.emptyData + + static let defaultInstance = _StorageClass() + + private init() {} + + init(copying source: _StorageClass) { + _manifest = source._manifest + _payload = source._payload + } + } + + fileprivate mutating func _uniqueStorage() -> _StorageClass { + if !isKnownUniquelyReferenced(&_storage) { + _storage = _StorageClass(copying: _storage) + } + return _storage + } + + public mutating func decodeMessage(decoder: inout D) throws { + _ = _uniqueStorage() + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + while let fieldNumber = try decoder.nextFieldNumber() { + switch fieldNumber { + case 1: try decoder.decodeSingularMessageField(value: &_storage._manifest) + case 2: try decoder.decodeSingularBytesField(value: &_storage._payload) + default: break + } + } + } + } + + public func traverse(visitor: inout V) throws { + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + if let v = _storage._manifest { + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + } + if !_storage._payload.isEmpty { + try visitor.visitSingularBytesField(value: _storage._payload, fieldNumber: 2) + } + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: ProtoCRDTORMapKey, rhs: ProtoCRDTORMapKey) -> Bool { + if lhs._storage !== rhs._storage { + let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in + let _storage = _args.0 + let rhs_storage = _args.1 + if _storage._manifest != rhs_storage._manifest {return false} + if _storage._payload != rhs_storage._payload {return false} + return true + } + if !storagesAreEqual {return false} + } + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension ProtoCRDTORMapValue: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = "CRDTORMapValue" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "manifest"), + 2: .same(proto: "payload"), + ] + + fileprivate class _StorageClass { + var _manifest: ProtoManifest? = nil + var _payload: Data = SwiftProtobuf.Internal.emptyData + + static let defaultInstance = _StorageClass() + + private init() {} + + init(copying source: _StorageClass) { + _manifest = source._manifest + _payload = source._payload + } + } + + fileprivate mutating func _uniqueStorage() -> _StorageClass { + if !isKnownUniquelyReferenced(&_storage) { + _storage = _StorageClass(copying: _storage) + } + return _storage + } + + public mutating func decodeMessage(decoder: inout D) throws { + _ = _uniqueStorage() + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + while let fieldNumber = try decoder.nextFieldNumber() { + switch fieldNumber { + case 1: try decoder.decodeSingularMessageField(value: &_storage._manifest) + case 2: try decoder.decodeSingularBytesField(value: &_storage._payload) + default: break + } + } + } + } + + public func traverse(visitor: inout V) throws { + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + if let v = _storage._manifest { + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + } + if !_storage._payload.isEmpty { + try visitor.visitSingularBytesField(value: _storage._payload, fieldNumber: 2) + } + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: ProtoCRDTORMapValue, rhs: ProtoCRDTORMapValue) -> Bool { + if lhs._storage !== rhs._storage { + let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in + let _storage = _args.0 + let rhs_storage = _args.1 + if _storage._manifest != rhs_storage._manifest {return false} + if _storage._payload != rhs_storage._payload {return false} + return true + } + if !storagesAreEqual {return false} + } + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension ProtoCRDTORMapKeyValue: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = "CRDTORMapKeyValue" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "key"), + 2: .same(proto: "value"), + ] + + fileprivate class _StorageClass { + var _key: ProtoCRDTORMapKey? = nil + var _value: ProtoCRDTORMapValue? = nil + + static let defaultInstance = _StorageClass() + + private init() {} + + init(copying source: _StorageClass) { + _key = source._key + _value = source._value + } + } + + fileprivate mutating func _uniqueStorage() -> _StorageClass { + if !isKnownUniquelyReferenced(&_storage) { + _storage = _StorageClass(copying: _storage) + } + return _storage + } + + public mutating func decodeMessage(decoder: inout D) throws { + _ = _uniqueStorage() + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + while let fieldNumber = try decoder.nextFieldNumber() { + switch fieldNumber { + case 1: try decoder.decodeSingularMessageField(value: &_storage._key) + case 2: try decoder.decodeSingularMessageField(value: &_storage._value) + default: break + } + } + } + } + + public func traverse(visitor: inout V) throws { + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + if let v = _storage._key { + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + } + if let v = _storage._value { + try visitor.visitSingularMessageField(value: v, fieldNumber: 2) + } + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: ProtoCRDTORMapKeyValue, rhs: ProtoCRDTORMapKeyValue) -> Bool { + if lhs._storage !== rhs._storage { + let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in + let _storage = _args.0 + let rhs_storage = _args.1 + if _storage._key != rhs_storage._key {return false} + if _storage._value != rhs_storage._value {return false} + return true + } + if !storagesAreEqual {return false} + } + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = "CRDTORMap" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "replicaID"), + 2: .same(proto: "defaultValue"), + 3: .same(proto: "keys"), + 4: .same(proto: "values"), + 5: .same(proto: "updatedValues"), + ] + + fileprivate class _StorageClass { + var _replicaID: ProtoVersionReplicaID? = nil + var _defaultValue: ProtoCRDTORMapValue? = nil + var _keys: ProtoCRDTORSet? = nil + var _values: [ProtoCRDTORMapKeyValue] = [] + var _updatedValues: [ProtoCRDTORMapKeyValue] = [] + + static let defaultInstance = _StorageClass() + + private init() {} + + init(copying source: _StorageClass) { + _replicaID = source._replicaID + _defaultValue = source._defaultValue + _keys = source._keys + _values = source._values + _updatedValues = source._updatedValues + } + } + + fileprivate mutating func _uniqueStorage() -> _StorageClass { + if !isKnownUniquelyReferenced(&_storage) { + _storage = _StorageClass(copying: _storage) + } + return _storage + } + + public mutating func decodeMessage(decoder: inout D) throws { + _ = _uniqueStorage() + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + while let fieldNumber = try decoder.nextFieldNumber() { + switch fieldNumber { + case 1: try decoder.decodeSingularMessageField(value: &_storage._replicaID) + case 2: try decoder.decodeSingularMessageField(value: &_storage._defaultValue) + case 3: try decoder.decodeSingularMessageField(value: &_storage._keys) + case 4: try decoder.decodeRepeatedMessageField(value: &_storage._values) + case 5: try decoder.decodeRepeatedMessageField(value: &_storage._updatedValues) + default: break + } + } + } + } + + public func traverse(visitor: inout V) throws { + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + if let v = _storage._replicaID { + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + } + if let v = _storage._defaultValue { + try visitor.visitSingularMessageField(value: v, fieldNumber: 2) + } + if let v = _storage._keys { + try visitor.visitSingularMessageField(value: v, fieldNumber: 3) + } + if !_storage._values.isEmpty { + try visitor.visitRepeatedMessageField(value: _storage._values, fieldNumber: 4) + } + if !_storage._updatedValues.isEmpty { + try visitor.visitRepeatedMessageField(value: _storage._updatedValues, fieldNumber: 5) + } + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: ProtoCRDTORMap, rhs: ProtoCRDTORMap) -> Bool { + if lhs._storage !== rhs._storage { + let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in + let _storage = _args.0 + let rhs_storage = _args.1 + if _storage._replicaID != rhs_storage._replicaID {return false} + if _storage._defaultValue != rhs_storage._defaultValue {return false} + if _storage._keys != rhs_storage._keys {return false} + if _storage._values != rhs_storage._values {return false} + if _storage._updatedValues != rhs_storage._updatedValues {return false} + return true + } + if !storagesAreEqual {return false} + } + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = ProtoCRDTORMap.protoMessageName + ".Delta" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "defaultValue"), + 2: .same(proto: "keys"), + 3: .same(proto: "values"), + ] + + fileprivate class _StorageClass { + var _defaultValue: ProtoCRDTORMapValue? = nil + var _keys: ProtoCRDTVersionedContainerDelta? = nil + var _values: [ProtoCRDTORMapKeyValue] = [] + + static let defaultInstance = _StorageClass() + + private init() {} + + init(copying source: _StorageClass) { + _defaultValue = source._defaultValue + _keys = source._keys + _values = source._values + } + } + + fileprivate mutating func _uniqueStorage() -> _StorageClass { + if !isKnownUniquelyReferenced(&_storage) { + _storage = _StorageClass(copying: _storage) + } + return _storage + } + + public mutating func decodeMessage(decoder: inout D) throws { + _ = _uniqueStorage() + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + while let fieldNumber = try decoder.nextFieldNumber() { + switch fieldNumber { + case 1: try decoder.decodeSingularMessageField(value: &_storage._defaultValue) + case 2: try decoder.decodeSingularMessageField(value: &_storage._keys) + case 3: try decoder.decodeRepeatedMessageField(value: &_storage._values) + default: break + } + } + } + } + + public func traverse(visitor: inout V) throws { + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + if let v = _storage._defaultValue { + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + } + if let v = _storage._keys { + try visitor.visitSingularMessageField(value: v, fieldNumber: 2) + } + if !_storage._values.isEmpty { + try visitor.visitRepeatedMessageField(value: _storage._values, fieldNumber: 3) + } + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: ProtoCRDTORMap.Delta, rhs: ProtoCRDTORMap.Delta) -> Bool { + if lhs._storage !== rhs._storage { + let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in + let _storage = _args.0 + let rhs_storage = _args.1 + if _storage._defaultValue != rhs_storage._defaultValue {return false} + if _storage._keys != rhs_storage._keys {return false} + if _storage._values != rhs_storage._values {return false} + return true + } + if !storagesAreEqual {return false} + } + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + extension ProtoCRDTLWWRegister: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { public static let protoMessageName: String = "CRDTLWWRegister" public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+GCounter.swift b/Sources/DistributedActors/CRDT/Types/CRDT+GCounter.swift index 2435db339..d35bd4d28 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+GCounter.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+GCounter.swift @@ -128,6 +128,18 @@ extension CRDT.GCounter: ResettableCRDT { } } +extension CRDT.GCounter: CloneableCRDT { + private init(replicaID: ReplicaID, state: [ReplicaID: Int], delta: Delta?) { + self.replicaID = replicaID + self.state = state + self.delta = delta + } + + public func clone() -> CRDT.GCounter { + CRDT.GCounter(replicaID: self.replicaID, state: self.state, delta: self.delta) + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned GCounter diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+LWWMap.swift b/Sources/DistributedActors/CRDT/Types/CRDT+LWWMap.swift index 3b88f96e1..917d5209e 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+LWWMap.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+LWWMap.swift @@ -22,7 +22,7 @@ extension CRDT { /// - SeeAlso: Akka's [`LWWMap`](https://github.com/akka/akka/blob/master/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala) /// - SeeAlso: `CRDT.ORMap` /// - SeeAlso: `CRDT.LWWRegister` - public struct LWWMap: NamedDeltaCRDT, LWWMapOperations { + public struct LWWMap: NamedDeltaCRDT, LWWMapOperations { public typealias Delta = ORMapDelta> public let replicaID: ReplicaID @@ -56,16 +56,14 @@ extension CRDT { init(replicaID: ReplicaID, defaultValue: Value) { self.replicaID = replicaID - self.state = .init(replicaID: replicaID) { - // This is relevant only in `ORMap.merge`, when `key` exists in `other` but not `self` and therefore we - // must create a "zero" value before merging `other` into it. - // The "zero" value's timestamp must happen-before `other`'s to allow `other` to win. If we just - // use the current time here `other` would never win. - // We don't need to worry about the usage of this and timestamp being too new in `ORMap.update` because - // a call to `LWWRegister.assign` immediately follows and the value is updated without comparing - // timestamps. - LWWRegister(replicaID: replicaID, initialValue: defaultValue, clock: WallTimeClock.zero) - } + // `defaultValue` is relevant only in `ORMap.merge`, when `key` exists in `other` but not `self` + // and therefore we must create a "zero" value before merging `other` into it. + // The "zero" value's timestamp must happen-before `other`'s to allow `other` to win. If we just + // use the current time here `other` would never win. + // We don't need to worry about the usage of this and timestamp being too new in `ORMap.update` + // because a call to `LWWRegister.assign` immediately follows and the value is updated without + // comparing timestamps. + self.state = .init(replicaID: replicaID, defaultValue: LWWRegister(replicaID: replicaID, initialValue: defaultValue, clock: WallTimeClock.zero)) } /// Gets the value, if any, associated with `key`. @@ -126,6 +124,17 @@ extension CRDT { } } +extension CRDT.LWWMap: CloneableCRDT { + private init(replicaID: ReplicaID, state: CRDT.ORMap>) { + self.replicaID = replicaID + self.state = state + } + + public func clone() -> CRDT.LWWMap { + CRDT.LWWMap(replicaID: self.replicaID, state: state) + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned LWWMap diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+LWWRegister.swift b/Sources/DistributedActors/CRDT/Types/CRDT+LWWRegister.swift index 6b9c713bc..014189b35 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+LWWRegister.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+LWWRegister.swift @@ -83,6 +83,20 @@ extension CRDT.LWWRegister: ResettableCRDT { } } +extension CRDT.LWWRegister: CloneableCRDT { + private init(replicaID: ReplicaID, initialValue: Value, value: Value, clock: WallTimeClock, updatedBy: ReplicaID) { + self.replicaID = replicaID + self.initialValue = initialValue + self.value = value + self.clock = clock + self.updatedBy = updatedBy + } + + public func clone() -> CRDT.LWWRegister { + CRDT.LWWRegister(replicaID: self.replicaID, initialValue: self.initialValue, value: self.value, clock: self.clock, updatedBy: self.updatedBy) + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned LWWRegister diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift b/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift index 4429ebf81..83f608f19 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift @@ -51,15 +51,12 @@ extension CRDT { /// /// - SeeAlso: [Delta State Replicated Data Types](https://arxiv.org/pdf/1603.01529.pdf) /// - SeeAlso: `CRDT.ORSet` - public struct ORMap: NamedDeltaCRDT, ORMapOperations { + public struct ORMap: NamedDeltaCRDT, ORMapOperations { public typealias Delta = ORMapDelta public let replicaID: ReplicaID - /// Creates a new `Value` instance. e.g., zero counter, empty set, etc. - /// The initializer should not close over mutable state as no strong guarantees are provided about - /// which context it will execute on. - private let valueInitializer: () -> Value + let defaultValue: Value /// ORSet to maintain causal history of the keys only; values keep their own causal history (if applicable). /// This is for tracking key additions and removals. @@ -73,7 +70,7 @@ extension CRDT { public var delta: Delta? { // `_keys` should always be mutated whenever `self` is modified in any way. if let keysDelta = self._keys.delta { - return ORMapDelta(keys: keysDelta, values: self.updatedValues, valueInitializer: self.valueInitializer) + return ORMapDelta(keys: keysDelta, values: self.updatedValues, defaultValue: self.defaultValue) } // If `_keys` has not been mutated then assume `self` has not been modified either. return nil @@ -99,9 +96,9 @@ extension CRDT { self._values.isEmpty } - init(replicaID: ReplicaID, valueInitializer: @escaping () -> Value) { + init(replicaID: ReplicaID, defaultValue: Value) { self.replicaID = replicaID - self.valueInitializer = valueInitializer + self.defaultValue = defaultValue self._keys = ORSet(replicaID: replicaID) self._values = [:] } @@ -111,7 +108,7 @@ extension CRDT { self._keys.add(key) // Apply `mutator` to the value then save it to state. Create `Value` if needed. - var value = self._values[key] ?? self.valueInitializer() + var value = self._values[key] ?? self.defaultValue.clone() mutator(&value) self._values[key] = value @@ -157,43 +154,34 @@ extension CRDT { self._keys.merge(other: other._keys) // Use the updated `_keys` to merge `_values` dictionaries. // Keys that no longer exist will have their values deleted as well. - self._values.merge(keys: self._keys.elements, other: other._values, valueInitializer: self.valueInitializer) + self._values.merge(keys: self._keys.elements, other: other._values, defaultValue: self.defaultValue) } public mutating func mergeDelta(_ delta: Delta) { self._keys.mergeDelta(delta.keys) // Use the updated `_keys` to merge `_values` dictionaries. // Keys that no longer exist will have their values deleted as well. - self._values.merge(keys: self._keys.elements, other: delta.values, valueInitializer: self.valueInitializer) + self._values.merge(keys: self._keys.elements, other: delta.values, defaultValue: self.defaultValue) } public mutating func resetDelta() { self._keys.resetDelta() self.updatedValues.removeAll() } - - public init(from decoder: Decoder) throws { - fatalError("TODO: implement serialization of ORMap") // FIXME: crdt serialization - } - - public func encode(to encoder: Encoder) throws { - fatalError("TODO: implement serialization of ORMap") // FIXME: crdt serialization - } } - public struct ORMapDelta: CvRDT { + public struct ORMapDelta: CvRDT { var keys: ORSet.Delta // TODO: potential optimization: send only the delta if Value is DeltaCRDT. i.e., instead of Value here we would use Value.Delta // TODO: `merge` defined in the Dictionary extension below should use `mergeDelta` when Value is DeltaCRDT var values: [Key: Value] - // FIXME: this is not serializable -- need to pick something rather than depend on the fn? - private let valueInitializer: () -> Value + let defaultValue: Value - init(keys: ORSet.Delta, values: [Key: Value], valueInitializer: @escaping () -> Value) { + init(keys: ORSet.Delta, values: [Key: Value], defaultValue: Value) { self.keys = keys self.values = values - self.valueInitializer = valueInitializer + self.defaultValue = defaultValue } public mutating func _tryMerge(other: StateBasedCRDT) -> CRDT.MergeError? { @@ -211,21 +199,13 @@ extension CRDT { self.keys.merge(other: other.keys) // Use the updated `keys` to merge `values` dictionaries. // Keys that no longer exist will have their values deleted as well. - self.values.merge(keys: self.keys.elements, other: other.values, valueInitializer: self.valueInitializer) - } - - public init(from decoder: Decoder) throws { - fatalError("TODO: implement serialization of ORMapDelta") // FIXME: crdt serialization - } - - public func encode(to encoder: Encoder) throws { - fatalError("TODO: implement serialization of ORMapDelta") // FIXME: crdt serialization + self.values.merge(keys: self.keys.elements, other: other.values, defaultValue: self.defaultValue) } } } -extension Dictionary where Key: Hashable, Value: CvRDT { - internal mutating func merge(keys: Set, other: [Key: Value], valueInitializer: () -> Value) { +extension Dictionary where Key: Hashable, Value: CvRDT & CloneableCRDT { + internal mutating func merge(keys: Set, other: [Key: Value], defaultValue: Value) { // Remove from `self` and `other` keys that no longer exist self = self.filter { k, _ in keys.contains(k) } let other = other.filter { k, _ in keys.contains(k) } @@ -234,7 +214,7 @@ extension Dictionary where Key: Hashable, Value: CvRDT { for (k, rv) in other { // If `k` is not found in `self` then create a new `Value` instance. // We must NOT copy `other`'s value directly to `self` because the two should have different replica IDs. - var lv: Value = self[k] ?? valueInitializer() + var lv: Value = self[k] ?? defaultValue.clone() lv.merge(other: rv) self[k] = lv } @@ -261,6 +241,20 @@ extension CRDT.ORMap: ORMapWithResettableValue where Value: ResettableCRDT { } } +extension CRDT.ORMap: CloneableCRDT { + private init(replicaID: ReplicaID, defaultValue: Value, keys: CRDT.ORSet, values: [Key: Value], updatedValues: [Key: Value]) { + self.replicaID = replicaID + self.defaultValue = defaultValue + self._keys = keys + self._values = values + self.updatedValues = updatedValues + } + + public func clone() -> CRDT.ORMap { + CRDT.ORMap(replicaID: self.replicaID, defaultValue: self.defaultValue, keys: self._keys, values: self._values, updatedValues: self.updatedValues) + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned - Common protocols and extensions for generic and specialized ORMap types (e.g., ORMap, LWWMap) @@ -348,8 +342,8 @@ extension CRDT.ActorOwned where DataType: ORMapOperations { } extension CRDT.ORMap { - public static func owned(by owner: ActorContext, id: String, valueInitializer: @escaping () -> Value) -> CRDT.ActorOwned> { - CRDT.ActorOwned(ownerContext: owner, id: CRDT.Identity(id), data: CRDT.ORMap(replicaID: .actorAddress(owner.address), valueInitializer: valueInitializer)) + public static func owned(by owner: ActorContext, id: String, defaultValue: Value) -> CRDT.ActorOwned> { + CRDT.ActorOwned(ownerContext: owner, id: CRDT.Identity(id), data: CRDT.ORMap(replicaID: .actorAddress(owner.address), defaultValue: defaultValue)) } } diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+ORMultiMap.swift b/Sources/DistributedActors/CRDT/Types/CRDT+ORMultiMap.swift index 6597eb191..e15c169f7 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+ORMultiMap.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+ORMultiMap.swift @@ -55,9 +55,7 @@ extension CRDT { init(replicaID: ReplicaID) { self.replicaID = replicaID - self.state = .init(replicaID: replicaID) { - ORSet(replicaID: replicaID) - } + self.state = .init(replicaID: replicaID, defaultValue: ORSet(replicaID: replicaID)) } /// Gets the set of values, if any, associated with `key`. @@ -122,6 +120,17 @@ extension CRDT { } } +extension CRDT.ORMultiMap: CloneableCRDT { + private init(replicaID: ReplicaID, state: CRDT.ORMap>) { + self.replicaID = replicaID + self.state = state + } + + public func clone() -> CRDT.ORMultiMap { + CRDT.ORMultiMap(replicaID: self.replicaID, state: self.state) + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned ORMultiMap diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+ORSet.swift b/Sources/DistributedActors/CRDT/Types/CRDT+ORSet.swift index dd7f8aa93..4fa19ac76 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+ORSet.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+ORSet.swift @@ -155,6 +155,17 @@ extension CRDT.ORSet: ResettableCRDT { } } +extension CRDT.ORSet: CloneableCRDT { + private init(replicaID: ReplicaID, state: CRDT.VersionedContainer) { + self.replicaID = replicaID + self.state = state + } + + public func clone() -> CRDT.ORSet { + CRDT.ORSet(replicaID: self.replicaID, state: self.state) + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned ORSet diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift b/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift index dd31e6482..28c0a03fc 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift @@ -133,3 +133,8 @@ public protocol NamedDeltaCRDT: DeltaCRDT { public protocol ResettableCRDT { mutating func reset() } + +/// CRDT that can be cloned +public protocol CloneableCRDT { + func clone() -> Self +} diff --git a/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift b/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift index cf20478f4..fa40dab91 100644 --- a/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift +++ b/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift @@ -114,6 +114,8 @@ extension Serialization { internal static let CRDTGCounterDelta: SerializerID = .protobufRepresentable internal static let CRDTORSet: SerializerID = .protobufRepresentable internal static let CRDTORSetDelta: SerializerID = .protobufRepresentable + internal static let CRDTORMap: SerializerID = .protobufRepresentable + internal static let CRDTORMapDelta: SerializerID = .protobufRepresentable internal static let CRDTLWWRegister: SerializerID = .protobufRepresentable internal static let ConvergentGossipMembership: SerializerID = .foundationJSON diff --git a/Tests/DistributedActorsTests/CRDT/CRDTActorOwnedTests.swift b/Tests/DistributedActorsTests/CRDT/CRDTActorOwnedTests.swift index 15db8bc04..4867e12d8 100644 --- a/Tests/DistributedActorsTests/CRDT/CRDTActorOwnedTests.swift +++ b/Tests/DistributedActorsTests/CRDT/CRDTActorOwnedTests.swift @@ -338,7 +338,7 @@ final class CRDTActorOwnedTests: ActorSystemTestBase { private func actorOwnedORMapBehavior(id: String, oep ownerEventProbe: ActorRef) -> Behavior { .setup { context in - let m = CRDT.ORMap.owned(by: context, id: id, valueInitializer: { CRDT.GCounter(replicaID: .actorAddress(context.address)) }) + let m = CRDT.ORMap.owned(by: context, id: id, defaultValue: CRDT.GCounter(replicaID: .actorAddress(context.address))) m.onUpdate { id, mm in context.log.trace("ORMap \(id) updated with new value: \(mm.underlying)") ownerEventProbe.tell(.ownerDefinedOnUpdate) diff --git a/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift b/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift index 0783db020..df13743ed 100644 --- a/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift @@ -25,6 +25,8 @@ final class CRDTSerializationTests: ActorSystemTestBase { // TODO: all this registering will go away with _mangledTypeName settings.serialization.register(CRDT.ORSet.self, serializerID: Serialization.ReservedID.CRDTORSet) settings.serialization.register(CRDT.ORSet.Delta.self, serializerID: Serialization.ReservedID.CRDTORSetDelta) + settings.serialization.register(CRDT.ORMap>.self, serializerID: Serialization.ReservedID.CRDTORMap) + settings.serialization.register(CRDT.ORMap>.Delta.self, serializerID: Serialization.ReservedID.CRDTORMapDelta) settings.serialization.register(CRDT.LWWRegister.self, serializerID: Serialization.ReservedID.CRDTLWWRegister) } } @@ -211,6 +213,71 @@ final class CRDTSerializationTests: ActorSystemTestBase { } } + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: ORMap + + func test_serializationOf_ORMap() throws { + try shouldNotThrow { + var map = CRDT.ORMap>(replicaID: .actorAddress(self.ownerAlpha), defaultValue: CRDT.ORSet(replicaID: .actorAddress(self.ownerAlpha))) + map.update(key: "s1") { $0.add("a") } + map.update(key: "s2") { $0.add("b") } + map.update(key: "s1") { $0.add("c") } + map.delta.shouldNotBeNil() + + let serialized = try system.serialization.serialize(map) + let deserialized = try system.serialization.deserialize(as: CRDT.ORMap>.self, from: serialized) + + "\(deserialized.replicaID)".shouldContain("actor:sact://CRDTSerializationTests@localhost:9001/user/alpha") + deserialized._keys.elements.shouldEqual(["s1", "s2"]) + deserialized._values.count.shouldEqual(2) + + guard let s1 = deserialized["s1"] else { + throw shouldNotHappen("Expect deserialized to contain \"s1\", got \(deserialized)") + } + s1.elements.shouldEqual(["a", "c"]) + + guard let s2 = deserialized["s2"] else { + throw shouldNotHappen("Expect deserialized to contain \"s2\", got \(deserialized)") + } + s2.elements.shouldEqual(["b"]) + + // Contains same element as `values` + deserialized.updatedValues.count.shouldEqual(2) + + // Derived from `updatedValues` + deserialized.delta.shouldNotBeNil() + deserialized.delta!.keys.elementByBirthDot.count.shouldEqual(map.delta!.keys.elementByBirthDot.count) // same elements added to delta + deserialized.delta!.values.count.shouldEqual(2) + } + } + + func test_serializationOf_ORMap_delta() throws { + try shouldNotThrow { + var map = CRDT.ORMap>(replicaID: .actorAddress(self.ownerAlpha), defaultValue: CRDT.ORSet(replicaID: .actorAddress(self.ownerAlpha))) + map.update(key: "s1") { $0.add("a") } + map.update(key: "s2") { $0.add("b") } + map.update(key: "s1") { $0.add("c") } + map.delta.shouldNotBeNil() + + let serialized = try system.serialization.serialize(map.delta!) // !-safe, must have a delta, we just checked it + let deserialized = try system.serialization.deserialize(as: CRDT.ORMap>.Delta.self, from: serialized) + + deserialized.keys.elementByBirthDot.count.shouldEqual(map.delta!.keys.elementByBirthDot.count) + deserialized.values.count.shouldEqual(2) + + // delta contains the same elements as map + guard let s1 = deserialized.values["s1"] else { + throw shouldNotHappen("Expect deserialized to contain \"s1\", got \(deserialized)") + } + s1.elements.shouldEqual(["a", "c"]) + + guard let s2 = deserialized.values["s2"] else { + throw shouldNotHappen("Expect deserialized to contain \"s2\", got \(deserialized)") + } + s2.elements.shouldEqual(["b"]) + } + } + // ==== ------------------------------------------------------------------------------------------------------------ // MARK: LWWRegister diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTGCounterTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTGCounterTests.swift index 122089512..36bda2af6 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTGCounterTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTGCounterTests.swift @@ -20,7 +20,7 @@ final class CRDTGCounterTests: XCTestCase { let replicaA: ReplicaID = .actorAddress(try! ActorAddress(path: ActorPath._user.appending("a"), incarnation: .wellKnown)) let replicaB: ReplicaID = .actorAddress(try! ActorAddress(path: ActorPath._user.appending("b"), incarnation: .wellKnown)) - func test_GCounter_increment_shouldUpdateDelta() throws { + func test_increment_shouldUpdateDelta() throws { var g1 = CRDT.GCounter(replicaID: self.replicaA) g1.increment(by: 1) @@ -36,7 +36,7 @@ final class CRDTGCounterTests: XCTestCase { g1.delta!.state[g1.replicaID]!.shouldEqual(11) // 1 + 10 } - func test_GCounter_merge_shouldMutate() throws { + func test_merge_shouldMutate() throws { var g1 = CRDT.GCounter(replicaID: self.replicaA) g1.increment(by: 1) var g2 = CRDT.GCounter(replicaID: self.replicaB) @@ -49,7 +49,7 @@ final class CRDTGCounterTests: XCTestCase { g2.value.shouldEqual(10) // unchanged } - func test_GCounter_merging_shouldNotMutate() throws { + func test_merging_shouldNotMutate() throws { var g1 = CRDT.GCounter(replicaID: self.replicaA) g1.increment(by: 1) var g2 = CRDT.GCounter(replicaID: self.replicaB) @@ -65,7 +65,7 @@ final class CRDTGCounterTests: XCTestCase { g3.value.shouldEqual(11) // 1 (g1) + 10 (g2) } - func test_GCounter_mergeDelta_shouldMutate() throws { + func test_mergeDelta_shouldMutate() throws { var g1 = CRDT.GCounter(replicaID: self.replicaA) g1.increment(by: 1) var g2 = CRDT.GCounter(replicaID: self.replicaB) @@ -80,7 +80,7 @@ final class CRDTGCounterTests: XCTestCase { g1.value.shouldEqual(11) // 1 (g1) + 10 (g2 delta) } - func test_GCounter_mergingDelta_shouldNotMutate() throws { + func test_mergingDelta_shouldNotMutate() throws { var g1 = CRDT.GCounter(replicaID: self.replicaA) g1.increment(by: 1) var g2 = CRDT.GCounter(replicaID: self.replicaB) @@ -97,7 +97,7 @@ final class CRDTGCounterTests: XCTestCase { g3.value.shouldEqual(11) // 1 (g1) + 10 (g2 delta) } - func test_GCounter_reset() throws { + func test_reset() throws { var g1 = CRDT.GCounter(replicaID: self.replicaA) g1.increment(by: 1) g1.increment(by: 5) @@ -106,4 +106,15 @@ final class CRDTGCounterTests: XCTestCase { g1.reset() g1.value.shouldEqual(0) } + + func test_clone() throws { + var g = CRDT.GCounter(replicaID: self.replicaA) + g.increment(by: 3) + + let clone = g.clone() + clone.replicaID.shouldEqual(g.replicaID) + clone.value.shouldEqual(g.value) + clone.state.shouldEqual(g.state) + clone.delta.shouldNotBeNil() + } } diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWMapTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWMapTests.swift index d2439175d..8b877b095 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWMapTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWMapTests.swift @@ -20,7 +20,7 @@ final class CRDTLWWMapTests: XCTestCase { let replicaA: ReplicaID = .actorAddress(try! ActorAddress(path: ActorPath._user.appending("a"), incarnation: .wellKnown)) let replicaB: ReplicaID = .actorAddress(try! ActorAddress(path: ActorPath._user.appending("b"), incarnation: .wellKnown)) - func test_LWWMap_basicOperations() throws { + func test_basicOperations() throws { var m1 = CRDT.LWWMap(replicaID: self.replicaA, defaultValue: 0) m1.underlying.shouldEqual([:]) @@ -91,7 +91,7 @@ final class CRDTLWWMapTests: XCTestCase { m1.isEmpty.shouldBeTrue() } - func test_LWWMap_update_remove_shouldUpdateDelta() throws { + func test_update_remove_shouldUpdateDelta() throws { var m1 = CRDT.LWWMap(replicaID: self.replicaA, defaultValue: 0) m1.set(forKey: "foo", value: 5) @@ -175,7 +175,7 @@ final class CRDTLWWMapTests: XCTestCase { d4foo.value.shouldEqual(6) } - func test_LWWMap_merge_shouldMutate() throws { + func test_merge_shouldMutate() throws { var m1 = CRDT.LWWMap(replicaID: self.replicaA, defaultValue: 0) var m2 = CRDT.LWWMap(replicaID: self.replicaB, defaultValue: 0) @@ -233,7 +233,7 @@ final class CRDTLWWMapTests: XCTestCase { m1.state._keys.state.elementByBirthDot[VersionDot(self.replicaB, 3)]!.shouldEqual("foo") } - func test_LWWMap_mergeDelta_shouldMutate() throws { + func test_mergeDelta_shouldMutate() throws { var m1 = CRDT.LWWMap(replicaID: self.replicaA, defaultValue: 0) // ORSet `keys`: [(A,1): "foo", (A,2): "bar"] // `values`: ["foo": 8, "bar": 6] @@ -281,7 +281,7 @@ final class CRDTLWWMapTests: XCTestCase { m1.state._keys.state.elementByBirthDot[VersionDot(self.replicaB, 2)]!.shouldEqual("baz") } - func test_LWWMap_resetValue_resetAllValues() throws { + func test_resetValue_resetAllValues() throws { var m1 = CRDT.LWWMap(replicaID: self.replicaA, defaultValue: 0) m1.set(forKey: "foo", value: 2) m1.set(forKey: "bar", value: 6) // (A,2) @@ -315,4 +315,23 @@ final class CRDTLWWMapTests: XCTestCase { } bar3.shouldEqual(0) } + + func test_clone() throws { + var m = CRDT.LWWMap(replicaID: self.replicaA, defaultValue: 5) + m.set(forKey: "foo", value: 2) + + let clone = m.clone() + clone.replicaID.shouldEqual(m.replicaID) + clone.keys.shouldEqual(m.keys) + + guard let mFoo = m["foo"] else { + throw shouldNotHappen("Expect m to contain \"foo\", got \(m)") + } + guard let cloneFoo = clone["foo"] else { + throw shouldNotHappen("Expect clone to contain \"foo\", got \(clone)") + } + cloneFoo.shouldEqual(mFoo) + + clone.delta.shouldNotBeNil() + } } diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWRegisterTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWRegisterTests.swift index 3561621da..6bb3323e5 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWRegisterTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWRegisterTests.swift @@ -20,7 +20,7 @@ final class CRDTLWWRegisterTests: XCTestCase { let replicaA: ReplicaID = .actorAddress(try! ActorAddress(path: ActorPath._user.appending("a"), incarnation: .wellKnown)) let replicaB: ReplicaID = .actorAddress(try! ActorAddress(path: ActorPath._user.appending("b"), incarnation: .wellKnown)) - func test_LWWRegister_assign_shouldSetValueAndTimestamp() throws { + func test_assign_shouldSetValueAndTimestamp() throws { var r1 = CRDT.LWWRegister(replicaID: self.replicaA, initialValue: 3) r1.value.shouldEqual(3) r1.updatedBy.shouldEqual(self.replicaA) @@ -36,7 +36,7 @@ final class CRDTLWWRegisterTests: XCTestCase { r1.initialValue.shouldEqual(3) // doesn't change } - func test_LWWRegister_merge_shouldMutateIfMoreRecentTimestamp() throws { + func test_merge_shouldMutateIfMoreRecentTimestamp() throws { let r1Clock = WallTimeClock() var r1 = CRDT.LWWRegister(replicaID: self.replicaA, initialValue: 3, clock: r1Clock) // Make sure r2's assignment has a more recent timestamp @@ -54,7 +54,7 @@ final class CRDTLWWRegisterTests: XCTestCase { r2.value.shouldEqual(5) // unchanged } - func test_LWWRegister_merge_shouldNotMutateIfOlderTimestamp() throws { + func test_merge_shouldNotMutateIfOlderTimestamp() throws { let r1Clock = WallTimeClock() var r1 = CRDT.LWWRegister(replicaID: self.replicaA, initialValue: 3, clock: r1Clock) // Make sure r2's assignment has an older timestamp @@ -70,7 +70,7 @@ final class CRDTLWWRegisterTests: XCTestCase { r1.updatedBy.shouldEqual(self.replicaA) } - func test_LWWRegister_merging_shouldNotMutate() throws { + func test_merging_shouldNotMutate() throws { let r1Clock = WallTimeClock() let r1 = CRDT.LWWRegister(replicaID: self.replicaA, initialValue: 3, clock: r1Clock) // Make sure r2's assignment has a more recent timestamp @@ -88,7 +88,7 @@ final class CRDTLWWRegisterTests: XCTestCase { r3.initialValue.shouldEqual(r1.initialValue) // r3 is built from r1 } - func test_LWWRegister_reset() throws { + func test_reset() throws { var r1 = CRDT.LWWRegister(replicaID: self.replicaA, initialValue: 3) r1.initialValue.shouldEqual(3) @@ -102,7 +102,20 @@ final class CRDTLWWRegisterTests: XCTestCase { r1.value.shouldEqual(3) } - func test_LWWRegister_optionalValueType() throws { + func test_clone() throws { + var r = CRDT.LWWRegister(replicaID: self.replicaA, initialValue: 6) + r.assign(8) + + let clone = r.clone() + clone.replicaID.shouldEqual(r.replicaID) + clone.initialValue.shouldEqual(r.initialValue) + clone.value.shouldEqual(r.value) + // `TimeInterval` is `Double` + XCTAssertEqual(clone.clock.timestamp.timeIntervalSince1970, r.clock.timestamp.timeIntervalSince1970, accuracy: 1) + clone.updatedBy.shouldEqual(r.updatedBy) + } + + func test_optionalValueType() throws { var r1 = CRDT.LWWRegister(replicaID: self.replicaA) r1.initialValue.shouldBeNil() r1.value.shouldBeNil() diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTORMapTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTORMapTests.swift index cd09522d5..15aa9805a 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTORMapTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTORMapTests.swift @@ -24,7 +24,7 @@ final class CRDTORMapTests: XCTestCase { // MARK: ORMap + GCounter tests func test_ORMap_GCounter_basicOperations() throws { - var m1 = CRDT.ORMap(replicaID: self.replicaA, valueInitializer: { CRDT.GCounter(replicaID: self.replicaA) }) + var m1 = CRDT.ORMap(replicaID: self.replicaA, defaultValue: CRDT.GCounter(replicaID: self.replicaA)) m1.keys.isEmpty.shouldBeTrue() m1.values.isEmpty.shouldBeTrue() @@ -79,7 +79,7 @@ final class CRDTORMapTests: XCTestCase { } func test_ORMap_GCounter_update_remove_shouldUpdateDelta() throws { - var m1 = CRDT.ORMap(replicaID: self.replicaA, valueInitializer: { CRDT.GCounter(replicaID: self.replicaA) }) + var m1 = CRDT.ORMap(replicaID: self.replicaA, defaultValue: CRDT.GCounter(replicaID: self.replicaA)) m1.update(key: "g1") { $0.increment(by: 5) } m1.count.shouldEqual(1) @@ -163,9 +163,9 @@ final class CRDTORMapTests: XCTestCase { } func test_ORMap_GCounter_merge_shouldMutate() throws { - var m1 = CRDT.ORMap(replicaID: self.replicaA, valueInitializer: { CRDT.GCounter(replicaID: self.replicaA) }) + var m1 = CRDT.ORMap(replicaID: self.replicaA, defaultValue: CRDT.GCounter(replicaID: self.replicaA)) - var m2 = CRDT.ORMap(replicaID: self.replicaB, valueInitializer: { CRDT.GCounter(replicaID: self.replicaB) }) + var m2 = CRDT.ORMap(replicaID: self.replicaB, defaultValue: CRDT.GCounter(replicaID: self.replicaB)) // ORSet `keys`: [(B,1): "g1"] // `values`: ["g1": GCounter(value = 5)] m2.update(key: "g1") { // (B,1) @@ -228,7 +228,7 @@ final class CRDTORMapTests: XCTestCase { } func test_ORMap_GCounter_mergeDelta_shouldMutate() throws { - var m1 = CRDT.ORMap(replicaID: self.replicaA, valueInitializer: { CRDT.GCounter(replicaID: self.replicaA) }) + var m1 = CRDT.ORMap(replicaID: self.replicaA, defaultValue: CRDT.GCounter(replicaID: self.replicaA)) // ORSet `keys`: [(A,1): "g1", (A,2): "g2"] // `values`: ["g1": GCounter(value = 8), "g2": GCounter(value = 6)] m1.update(key: "g1") { // (A,1) @@ -238,7 +238,7 @@ final class CRDTORMapTests: XCTestCase { $0.increment(by: 6) } - var m2 = CRDT.ORMap(replicaID: self.replicaB, valueInitializer: { CRDT.GCounter(replicaID: self.replicaB) }) + var m2 = CRDT.ORMap(replicaID: self.replicaB, defaultValue: CRDT.GCounter(replicaID: self.replicaB)) // ORSet `keys`: [(B,1): "g2", (B,2): "g3", (B,3): "g1"] // `values`: ["g1": GCounter(value = 2), "g2": GCounter(value = 3), "g3": GCounter(value = 5)] m2.update(key: "g2") { // (B,1) @@ -285,7 +285,7 @@ final class CRDTORMapTests: XCTestCase { } func test_ORMap_GCounter_resetValue_resetAllValues() throws { - var m1 = CRDT.ORMap(replicaID: self.replicaA, valueInitializer: { CRDT.GCounter(replicaID: self.replicaA) }) + var m1 = CRDT.ORMap(replicaID: self.replicaA, defaultValue: CRDT.GCounter(replicaID: self.replicaA)) m1.update(key: "g1") { $0.increment(by: 2) } @@ -323,11 +323,34 @@ final class CRDTORMapTests: XCTestCase { ggg2.value.shouldEqual(0) } + func test_ORMap_GCounter_clone() throws { + var m = CRDT.ORMap(replicaID: self.replicaA, defaultValue: CRDT.GCounter(replicaID: self.replicaA)) + m.update(key: "g1") { + $0.increment(by: 2) + } + + let clone = m.clone() + clone.replicaID.shouldEqual(m.replicaID) + clone.defaultValue.replicaID.shouldEqual(m.defaultValue.replicaID) + clone.defaultValue.value.shouldEqual(m.defaultValue.value) + clone.keys.shouldEqual(m.keys) + + guard let mG1 = m["g1"] else { + throw shouldNotHappen("Expect m to contain \"g1\", got \(m)") + } + guard let cloneG1 = clone["g1"] else { + throw shouldNotHappen("Expect clone to contain \"g1\", got \(clone)") + } + cloneG1.value.shouldEqual(mG1.value) + + clone.delta.shouldNotBeNil() + } + // ==== ------------------------------------------------------------------------------------------------------------ // MARK: ORMap + ORSet tests func test_ORMap_ORSet_basicOperations() throws { - var m1 = CRDT.ORMap>(replicaID: self.replicaA, valueInitializer: { CRDT.ORSet(replicaID: self.replicaA) }) + var m1 = CRDT.ORMap>(replicaID: self.replicaA, defaultValue: CRDT.ORSet(replicaID: self.replicaA)) m1.keys.isEmpty.shouldBeTrue() m1.values.isEmpty.shouldBeTrue() @@ -382,7 +405,7 @@ final class CRDTORMapTests: XCTestCase { } func test_ORMap_ORSet_removeValue_shouldRemoveInOtherReplicas() throws { - var m1 = CRDT.ORMap>(replicaID: self.replicaA, valueInitializer: { CRDT.ORSet(replicaID: self.replicaA) }) + var m1 = CRDT.ORMap>(replicaID: self.replicaA, defaultValue: CRDT.ORSet(replicaID: self.replicaA)) // ORSet `keys`: [(A,1): "s1", (A,2): "s2"] // `values`: ["s1": [1, 5], "s2": [3]] m1.update(key: "s1") { // (A,1) @@ -393,7 +416,7 @@ final class CRDTORMapTests: XCTestCase { $0.add(3) } - var m2 = CRDT.ORMap>(replicaID: self.replicaB, valueInitializer: { CRDT.ORSet(replicaID: self.replicaB) }) + var m2 = CRDT.ORMap>(replicaID: self.replicaB, defaultValue: CRDT.ORSet(replicaID: self.replicaB)) guard let delta1 = m1.delta else { throw shouldNotHappen("m1.delta should not be nil after updates") @@ -446,7 +469,7 @@ final class CRDTORMapTests: XCTestCase { /// ORSet) when replication has not been propagated yet and there are concurrent updates, because of the loss of /// causal history associated with `unsafeRemoveValue`. func test_ORMap_ORSet_removeValue_revivesDeletedElementsOnMerge() throws { - var m1 = CRDT.ORMap>(replicaID: self.replicaA, valueInitializer: { CRDT.ORSet(replicaID: self.replicaA) }) + var m1 = CRDT.ORMap>(replicaID: self.replicaA, defaultValue: CRDT.ORSet(replicaID: self.replicaA)) // ORSet `keys`: [(A,1): "s1", (A,2): "s2"] // `values`: ["s1": [1, 5], "s2": [3]] m1.update(key: "s1") { // (A,1) @@ -457,7 +480,7 @@ final class CRDTORMapTests: XCTestCase { $0.add(3) } - var m2 = CRDT.ORMap>(replicaID: self.replicaB, valueInitializer: { CRDT.ORSet(replicaID: self.replicaB) }) + var m2 = CRDT.ORMap>(replicaID: self.replicaB, defaultValue: CRDT.ORSet(replicaID: self.replicaB)) guard let delta = m1.delta else { throw shouldNotHappen("m1.delta should not be nil after updates") @@ -521,7 +544,7 @@ final class CRDTORMapTests: XCTestCase { /// causal history is retained and deleted elements will not come back even though changes might not have been /// replicated yet. func test_ORMap_ORSet_update_deletedElementsShouldNotReviveOnMerge() throws { - var m1 = CRDT.ORMap>(replicaID: self.replicaA, valueInitializer: { CRDT.ORSet(replicaID: self.replicaA) }) + var m1 = CRDT.ORMap>(replicaID: self.replicaA, defaultValue: CRDT.ORSet(replicaID: self.replicaA)) // ORSet `keys`: [(A,1): "s1", (A,2): "s2"] // `values`: ["s1": [1, 5], "s2": [3]] m1.update(key: "s1") { // (A,1) @@ -532,7 +555,7 @@ final class CRDTORMapTests: XCTestCase { $0.add(3) } - var m2 = CRDT.ORMap>(replicaID: self.replicaB, valueInitializer: { CRDT.ORSet(replicaID: self.replicaB) }) + var m2 = CRDT.ORMap>(replicaID: self.replicaB, defaultValue: CRDT.ORSet(replicaID: self.replicaB)) guard let delta = m1.delta else { throw shouldNotHappen("m1.delta should not be nil after updates") @@ -597,7 +620,7 @@ final class CRDTORMapTests: XCTestCase { } func test_ORMap_ORSet_resetValue_resetAllValues() throws { - var m1 = CRDT.ORMap>(replicaID: self.replicaA, valueInitializer: { CRDT.ORSet(replicaID: self.replicaA) }) + var m1 = CRDT.ORMap>(replicaID: self.replicaA, defaultValue: CRDT.ORSet(replicaID: self.replicaA)) m1.update(key: "s1") { $0.add(1) } m1.update(key: "s2") { $0.add(3) } m1.update(key: "s1") { $0.add(5) } diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTORMultiMapTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTORMultiMapTests.swift index 09455dcc6..36cd91403 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTORMultiMapTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTORMultiMapTests.swift @@ -20,7 +20,7 @@ final class CRDTORMultiMapTests: XCTestCase { let replicaA: ReplicaID = .actorAddress(try! ActorAddress(path: ActorPath._user.appending("a"), incarnation: .wellKnown)) let replicaB: ReplicaID = .actorAddress(try! ActorAddress(path: ActorPath._user.appending("b"), incarnation: .wellKnown)) - func test_ORMultiMap_basicOperations() throws { + func test_basicOperations() throws { var m1 = CRDT.ORMultiMap(replicaID: self.replicaA) m1.keys.isEmpty.shouldBeTrue() @@ -167,7 +167,7 @@ final class CRDTORMultiMapTests: XCTestCase { dsss1.elements.shouldEqual([16]) } - func test_ORMultiMap_merge_shouldMutate() throws { + func test_merge_shouldMutate() throws { var m1 = CRDT.ORMultiMap(replicaID: self.replicaA) var m2 = CRDT.ORMultiMap(replicaID: self.replicaB) @@ -222,7 +222,7 @@ final class CRDTORMultiMapTests: XCTestCase { m1.state._keys.state.elementByBirthDot[VersionDot(self.replicaB, 3)]!.shouldEqual("s1") } - func test_ORMultiMap_mergeDelta_shouldMutate() throws { + func test_mergeDelta_shouldMutate() throws { var m1 = CRDT.ORMultiMap(replicaID: self.replicaA) // ORSet `keys`: [(A,1): "s1", (A,2): "s2"] // `values`: ["s1": [8], "s2": [6]] @@ -269,7 +269,7 @@ final class CRDTORMultiMapTests: XCTestCase { m1.state._keys.state.elementByBirthDot[VersionDot(self.replicaB, 3)]!.shouldEqual("s1") } - func test_ORMultiMap_removeAll() throws { + func test_removeAll() throws { var m1 = CRDT.ORMultiMap(replicaID: self.replicaA) m1.add(forKey: "s1", 2) m1.add(forKey: "s2", 6) @@ -296,4 +296,23 @@ final class CRDTORMultiMapTests: XCTestCase { } ss2.shouldEqual([6]) // no change } + + func test_clone() throws { + var m = CRDT.ORMultiMap(replicaID: self.replicaA) + m.add(forKey: "s1", 2) + + let clone = m.clone() + clone.replicaID.shouldEqual(m.replicaID) + clone.keys.shouldEqual(m.keys) + + guard let mS1 = m["s1"] else { + throw shouldNotHappen("Expect m to contain \"s1\", got \(m)") + } + guard let cloneS1 = clone["s1"] else { + throw shouldNotHappen("Expect clone to contain \"s1\", got \(clone)") + } + cloneS1.shouldEqual(mS1) + + clone.delta.shouldNotBeNil() + } } diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTORSetTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTORSetTests.swift index e2a44dd72..eba3b15f2 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTORSetTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTORSetTests.swift @@ -20,7 +20,7 @@ final class CRDTORSetTests: XCTestCase { let replicaA: ReplicaID = .actorAddress(try! ActorAddress(path: ActorPath._user.appending("a"), incarnation: .wellKnown)) let replicaB: ReplicaID = .actorAddress(try! ActorAddress(path: ActorPath._user.appending("b"), incarnation: .wellKnown)) - func test_ORSet_basicOperations() throws { + func test_basicOperations() throws { var s1 = CRDT.ORSet(replicaID: self.replicaA) s1.elements.isEmpty.shouldBeTrue() @@ -46,7 +46,7 @@ final class CRDTORSetTests: XCTestCase { s1.isEmpty.shouldBeTrue() } - func test_ORSet_add_remove_shouldUpdateDelta() throws { + func test_add_remove_shouldUpdateDelta() throws { var s1 = CRDT.ORSet(replicaID: self.replicaA) // version 1 @@ -92,7 +92,7 @@ final class CRDTORSetTests: XCTestCase { d4.elementByBirthDot[VersionDot(s1.replicaID, 3)]!.shouldEqual(3) } - func test_ORSet_merge_shouldMutate() throws { + func test_merge_shouldMutate() throws { var s1 = CRDT.ORSet(replicaID: self.replicaA) s1.add(1) s1.add(3) @@ -116,7 +116,7 @@ final class CRDTORSetTests: XCTestCase { // (B,1): 3 and (B,3): 1 come from a different replica (B), so A cannot coalesce them. } - func test_ORSet_merge_shouldMutate_shouldCompact() throws { + func test_merge_shouldMutate_shouldCompact() throws { var s1 = CRDT.ORSet(replicaID: self.replicaA) var s2 = CRDT.ORSet(replicaID: self.replicaB) @@ -146,7 +146,7 @@ final class CRDTORSetTests: XCTestCase { s1.state.elementByBirthDot[VersionDot(s2.replicaID, 3)]!.shouldEqual(7) // (B,3): 7 } - func test_ORSet_mergeDelta_shouldMutate() throws { + func test_mergeDelta_shouldMutate() throws { var s1 = CRDT.ORSet(replicaID: self.replicaA) s1.add(1) s1.add(3) @@ -173,7 +173,7 @@ final class CRDTORSetTests: XCTestCase { // (B,1): 3 and (B,3): 1 come from a different replica (B), so A cannot coalesce them. } - func test_ORSet_mergeDelta_shouldMutate_shouldCompact() throws { + func test_mergeDelta_shouldMutate_shouldCompact() throws { var s1 = CRDT.ORSet(replicaID: self.replicaA) var s2 = CRDT.ORSet(replicaID: self.replicaB) @@ -206,7 +206,7 @@ final class CRDTORSetTests: XCTestCase { s1.state.elementByBirthDot[VersionDot(s2.replicaID, 3)]!.shouldEqual(7) // (B,3): 7 } - func test_ORSet_reset() throws { + func test_reset() throws { var s1 = CRDT.ORSet(replicaID: self.replicaA) s1.add(1) s1.add(3) @@ -215,4 +215,14 @@ final class CRDTORSetTests: XCTestCase { s1.reset() s1.isEmpty.shouldBeTrue() } + + func test_clone() throws { + var s = CRDT.ORSet(replicaID: self.replicaA) + s.add(3) + + let clone = s.clone() + clone.replicaID.shouldEqual(s.replicaID) + clone.elements.shouldEqual(s.elements) + clone.delta.shouldNotBeNil() + } } From bac634cfd88cee19eda6c2f361d37e201d93ce1e Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Tue, 28 Apr 2020 22:36:43 -0700 Subject: [PATCH 2/5] fix formatting --- .../CRDT/Protobuf/CRDT+SerializationTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift b/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift index df13743ed..e84225e34 100644 --- a/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift @@ -230,7 +230,7 @@ final class CRDTSerializationTests: ActorSystemTestBase { "\(deserialized.replicaID)".shouldContain("actor:sact://CRDTSerializationTests@localhost:9001/user/alpha") deserialized._keys.elements.shouldEqual(["s1", "s2"]) deserialized._values.count.shouldEqual(2) - + guard let s1 = deserialized["s1"] else { throw shouldNotHappen("Expect deserialized to contain \"s1\", got \(deserialized)") } @@ -264,7 +264,7 @@ final class CRDTSerializationTests: ActorSystemTestBase { deserialized.keys.elementByBirthDot.count.shouldEqual(map.delta!.keys.elementByBirthDot.count) deserialized.values.count.shouldEqual(2) - + // delta contains the same elements as map guard let s1 = deserialized.values["s1"] else { throw shouldNotHappen("Expect deserialized to contain \"s1\", got \(deserialized)") From 908e7b90d70b1b3ed095410ff4cf336adb9e4146 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Tue, 28 Apr 2020 23:42:09 -0700 Subject: [PATCH 3/5] Remove CloneableCRDT. Assume CRDTs have value semantics. --- .../CRDT/Protobuf/CRDT+Serialization.swift | 8 ++--- .../CRDT/Types/CRDT+GCounter.swift | 12 -------- .../CRDT/Types/CRDT+LWWMap.swift | 11 ------- .../CRDT/Types/CRDT+LWWRegister.swift | 14 --------- .../CRDT/Types/CRDT+ORMap.swift | 29 +++++-------------- .../CRDT/Types/CRDT+ORMultiMap.swift | 11 ------- .../CRDT/Types/CRDT+ORSet.swift | 11 ------- .../CRDT/Types/CRDT+StateBased.swift | 8 ++--- .../CRDT/Types/CRDTGCounterTests.swift | 11 ------- .../CRDT/Types/CRDTLWWMapTests.swift | 19 ------------ .../CRDT/Types/CRDTLWWRegisterTests.swift | 13 --------- .../CRDT/Types/CRDTORMapTests.swift | 23 --------------- .../CRDT/Types/CRDTORMultiMapTests.swift | 19 ------------ .../CRDT/Types/CRDTORSetTests.swift | 10 ------- 14 files changed, 15 insertions(+), 184 deletions(-) diff --git a/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift b/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift index 9773f5d3b..5fbbf70e4 100644 --- a/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift +++ b/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift @@ -267,7 +267,7 @@ private enum ORMapSerializationUtils { ) } - static func valueToProto(_ value: Value, context: Serialization.Context) throws -> ProtoCRDTORMapValue { + static func valueToProto(_ value: Value, context: Serialization.Context) throws -> ProtoCRDTORMapValue { let serialized = try context.serialization.serialize(value) var proto = ProtoCRDTORMapValue() proto.manifest = try serialized.manifest.toProto(context: context) @@ -275,7 +275,7 @@ private enum ORMapSerializationUtils { return proto } - static func valueFromProto(_ proto: ProtoCRDTORMapValue, context: Serialization.Context) throws -> Value { + static func valueFromProto(_ proto: ProtoCRDTORMapValue, context: Serialization.Context) throws -> Value { try context.serialization.deserialize( as: Value.self, from: .data(proto.payload), @@ -283,7 +283,7 @@ private enum ORMapSerializationUtils { ) } - static func valuesToProto(_ values: [Key: Value], context: Serialization.Context) throws -> [ProtoCRDTORMapKeyValue] { + static func valuesToProto(_ values: [Key: Value], context: Serialization.Context) throws -> [ProtoCRDTORMapKeyValue] { try values.map { key, value in var proto = ProtoCRDTORMapKeyValue() proto.key = try keyToProto(key, context: context) @@ -292,7 +292,7 @@ private enum ORMapSerializationUtils { } } - static func valuesFromProto(_ proto: [ProtoCRDTORMapKeyValue], context: Serialization.Context) throws -> [Key: Value] { + static func valuesFromProto(_ proto: [ProtoCRDTORMapKeyValue], context: Serialization.Context) throws -> [Key: Value] { try proto.reduce(into: [Key: Value]()) { result, protoKeyValue in guard protoKeyValue.hasKey else { throw SerializationError.missingField("key", type: String(describing: [Key: Value].self)) diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+GCounter.swift b/Sources/DistributedActors/CRDT/Types/CRDT+GCounter.swift index d35bd4d28..2435db339 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+GCounter.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+GCounter.swift @@ -128,18 +128,6 @@ extension CRDT.GCounter: ResettableCRDT { } } -extension CRDT.GCounter: CloneableCRDT { - private init(replicaID: ReplicaID, state: [ReplicaID: Int], delta: Delta?) { - self.replicaID = replicaID - self.state = state - self.delta = delta - } - - public func clone() -> CRDT.GCounter { - CRDT.GCounter(replicaID: self.replicaID, state: self.state, delta: self.delta) - } -} - // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned GCounter diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+LWWMap.swift b/Sources/DistributedActors/CRDT/Types/CRDT+LWWMap.swift index 917d5209e..e84647e82 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+LWWMap.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+LWWMap.swift @@ -124,17 +124,6 @@ extension CRDT { } } -extension CRDT.LWWMap: CloneableCRDT { - private init(replicaID: ReplicaID, state: CRDT.ORMap>) { - self.replicaID = replicaID - self.state = state - } - - public func clone() -> CRDT.LWWMap { - CRDT.LWWMap(replicaID: self.replicaID, state: state) - } -} - // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned LWWMap diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+LWWRegister.swift b/Sources/DistributedActors/CRDT/Types/CRDT+LWWRegister.swift index 014189b35..6b9c713bc 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+LWWRegister.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+LWWRegister.swift @@ -83,20 +83,6 @@ extension CRDT.LWWRegister: ResettableCRDT { } } -extension CRDT.LWWRegister: CloneableCRDT { - private init(replicaID: ReplicaID, initialValue: Value, value: Value, clock: WallTimeClock, updatedBy: ReplicaID) { - self.replicaID = replicaID - self.initialValue = initialValue - self.value = value - self.clock = clock - self.updatedBy = updatedBy - } - - public func clone() -> CRDT.LWWRegister { - CRDT.LWWRegister(replicaID: self.replicaID, initialValue: self.initialValue, value: self.value, clock: self.clock, updatedBy: self.updatedBy) - } -} - // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned LWWRegister diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift b/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift index 83f608f19..f3ffdf117 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift @@ -51,7 +51,7 @@ extension CRDT { /// /// - SeeAlso: [Delta State Replicated Data Types](https://arxiv.org/pdf/1603.01529.pdf) /// - SeeAlso: `CRDT.ORSet` - public struct ORMap: NamedDeltaCRDT, ORMapOperations { + public struct ORMap: NamedDeltaCRDT, ORMapOperations { public typealias Delta = ORMapDelta public let replicaID: ReplicaID @@ -108,7 +108,7 @@ extension CRDT { self._keys.add(key) // Apply `mutator` to the value then save it to state. Create `Value` if needed. - var value = self._values[key] ?? self.defaultValue.clone() + var value = self._values[key] ?? self.defaultValue mutator(&value) self._values[key] = value @@ -170,7 +170,7 @@ extension CRDT { } } - public struct ORMapDelta: CvRDT { + public struct ORMapDelta: CvRDT { var keys: ORSet.Delta // TODO: potential optimization: send only the delta if Value is DeltaCRDT. i.e., instead of Value here we would use Value.Delta // TODO: `merge` defined in the Dictionary extension below should use `mergeDelta` when Value is DeltaCRDT @@ -204,7 +204,7 @@ extension CRDT { } } -extension Dictionary where Key: Hashable, Value: CvRDT & CloneableCRDT { +extension Dictionary where Key: Hashable, Value: CvRDT { internal mutating func merge(keys: Set, other: [Key: Value], defaultValue: Value) { // Remove from `self` and `other` keys that no longer exist self = self.filter { k, _ in keys.contains(k) } @@ -214,7 +214,7 @@ extension Dictionary where Key: Hashable, Value: CvRDT & CloneableCRDT { for (k, rv) in other { // If `k` is not found in `self` then create a new `Value` instance. // We must NOT copy `other`'s value directly to `self` because the two should have different replica IDs. - var lv: Value = self[k] ?? defaultValue.clone() + var lv: Value = self[k] ?? defaultValue lv.merge(other: rv) self[k] = lv } @@ -241,20 +241,6 @@ extension CRDT.ORMap: ORMapWithResettableValue where Value: ResettableCRDT { } } -extension CRDT.ORMap: CloneableCRDT { - private init(replicaID: ReplicaID, defaultValue: Value, keys: CRDT.ORSet, values: [Key: Value], updatedValues: [Key: Value]) { - self.replicaID = replicaID - self.defaultValue = defaultValue - self._keys = keys - self._values = values - self.updatedValues = updatedValues - } - - public func clone() -> CRDT.ORMap { - CRDT.ORMap(replicaID: self.replicaID, defaultValue: self.defaultValue, keys: self._keys, values: self._values, updatedValues: self.updatedValues) - } -} - // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned - Common protocols and extensions for generic and specialized ORMap types (e.g., ORMap, LWWMap) @@ -275,11 +261,12 @@ public protocol ORMapWithUnsafeRemove { mutating func unsafeRemoveAllValues() } +/// Additional `ORMap` methods when `Value` type conforms to `ResettableCRDT`. public protocol ORMapWithResettableValue: ORMapWithUnsafeRemove { - /// Resets value for `key` to `defaultValue` provided in `init`. + /// Resets value for `key` by calling `ResettableCRDT.reset()`. mutating func resetValue(forKey key: Key) - /// Resets all values in the `LWWMap` to `defaultValue` provided in `init`. + /// Resets all values in the `ORMap` by calling `ResettableCRDT.reset()`. mutating func resetAllValues() } diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+ORMultiMap.swift b/Sources/DistributedActors/CRDT/Types/CRDT+ORMultiMap.swift index e15c169f7..068a98587 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+ORMultiMap.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+ORMultiMap.swift @@ -120,17 +120,6 @@ extension CRDT { } } -extension CRDT.ORMultiMap: CloneableCRDT { - private init(replicaID: ReplicaID, state: CRDT.ORMap>) { - self.replicaID = replicaID - self.state = state - } - - public func clone() -> CRDT.ORMultiMap { - CRDT.ORMultiMap(replicaID: self.replicaID, state: self.state) - } -} - // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned ORMultiMap diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+ORSet.swift b/Sources/DistributedActors/CRDT/Types/CRDT+ORSet.swift index 4fa19ac76..dd7f8aa93 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+ORSet.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+ORSet.swift @@ -155,17 +155,6 @@ extension CRDT.ORSet: ResettableCRDT { } } -extension CRDT.ORSet: CloneableCRDT { - private init(replicaID: ReplicaID, state: CRDT.VersionedContainer) { - self.replicaID = replicaID - self.state = state - } - - public func clone() -> CRDT.ORSet { - CRDT.ORSet(replicaID: self.replicaID, state: self.state) - } -} - // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: ActorOwned ORSet diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift b/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift index 28c0a03fc..a9e88cdce 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift @@ -19,6 +19,9 @@ import NIO // MARK: Core CRDT protocols /// Top-level protocol for any kind of state based CRDT (also known as `CvRDT`). +/// +/// ** Note: ** CRDT **must** have value semantics. Assumptions have been made in code with this being true, +/// and there might be undesirable consequences otherwise. public protocol StateBasedCRDT: Codable { /// Attempts to merge the state of the given data type instance into this data type instance. /// @@ -133,8 +136,3 @@ public protocol NamedDeltaCRDT: DeltaCRDT { public protocol ResettableCRDT { mutating func reset() } - -/// CRDT that can be cloned -public protocol CloneableCRDT { - func clone() -> Self -} diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTGCounterTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTGCounterTests.swift index 36bda2af6..b11b01607 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTGCounterTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTGCounterTests.swift @@ -106,15 +106,4 @@ final class CRDTGCounterTests: XCTestCase { g1.reset() g1.value.shouldEqual(0) } - - func test_clone() throws { - var g = CRDT.GCounter(replicaID: self.replicaA) - g.increment(by: 3) - - let clone = g.clone() - clone.replicaID.shouldEqual(g.replicaID) - clone.value.shouldEqual(g.value) - clone.state.shouldEqual(g.state) - clone.delta.shouldNotBeNil() - } } diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWMapTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWMapTests.swift index 8b877b095..3aecd1045 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWMapTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWMapTests.swift @@ -315,23 +315,4 @@ final class CRDTLWWMapTests: XCTestCase { } bar3.shouldEqual(0) } - - func test_clone() throws { - var m = CRDT.LWWMap(replicaID: self.replicaA, defaultValue: 5) - m.set(forKey: "foo", value: 2) - - let clone = m.clone() - clone.replicaID.shouldEqual(m.replicaID) - clone.keys.shouldEqual(m.keys) - - guard let mFoo = m["foo"] else { - throw shouldNotHappen("Expect m to contain \"foo\", got \(m)") - } - guard let cloneFoo = clone["foo"] else { - throw shouldNotHappen("Expect clone to contain \"foo\", got \(clone)") - } - cloneFoo.shouldEqual(mFoo) - - clone.delta.shouldNotBeNil() - } } diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWRegisterTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWRegisterTests.swift index 6bb3323e5..276cc785a 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWRegisterTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTLWWRegisterTests.swift @@ -102,19 +102,6 @@ final class CRDTLWWRegisterTests: XCTestCase { r1.value.shouldEqual(3) } - func test_clone() throws { - var r = CRDT.LWWRegister(replicaID: self.replicaA, initialValue: 6) - r.assign(8) - - let clone = r.clone() - clone.replicaID.shouldEqual(r.replicaID) - clone.initialValue.shouldEqual(r.initialValue) - clone.value.shouldEqual(r.value) - // `TimeInterval` is `Double` - XCTAssertEqual(clone.clock.timestamp.timeIntervalSince1970, r.clock.timestamp.timeIntervalSince1970, accuracy: 1) - clone.updatedBy.shouldEqual(r.updatedBy) - } - func test_optionalValueType() throws { var r1 = CRDT.LWWRegister(replicaID: self.replicaA) r1.initialValue.shouldBeNil() diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTORMapTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTORMapTests.swift index 15aa9805a..99e48fccf 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTORMapTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTORMapTests.swift @@ -323,29 +323,6 @@ final class CRDTORMapTests: XCTestCase { ggg2.value.shouldEqual(0) } - func test_ORMap_GCounter_clone() throws { - var m = CRDT.ORMap(replicaID: self.replicaA, defaultValue: CRDT.GCounter(replicaID: self.replicaA)) - m.update(key: "g1") { - $0.increment(by: 2) - } - - let clone = m.clone() - clone.replicaID.shouldEqual(m.replicaID) - clone.defaultValue.replicaID.shouldEqual(m.defaultValue.replicaID) - clone.defaultValue.value.shouldEqual(m.defaultValue.value) - clone.keys.shouldEqual(m.keys) - - guard let mG1 = m["g1"] else { - throw shouldNotHappen("Expect m to contain \"g1\", got \(m)") - } - guard let cloneG1 = clone["g1"] else { - throw shouldNotHappen("Expect clone to contain \"g1\", got \(clone)") - } - cloneG1.value.shouldEqual(mG1.value) - - clone.delta.shouldNotBeNil() - } - // ==== ------------------------------------------------------------------------------------------------------------ // MARK: ORMap + ORSet tests diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTORMultiMapTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTORMultiMapTests.swift index 36cd91403..66d5e54c2 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTORMultiMapTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTORMultiMapTests.swift @@ -296,23 +296,4 @@ final class CRDTORMultiMapTests: XCTestCase { } ss2.shouldEqual([6]) // no change } - - func test_clone() throws { - var m = CRDT.ORMultiMap(replicaID: self.replicaA) - m.add(forKey: "s1", 2) - - let clone = m.clone() - clone.replicaID.shouldEqual(m.replicaID) - clone.keys.shouldEqual(m.keys) - - guard let mS1 = m["s1"] else { - throw shouldNotHappen("Expect m to contain \"s1\", got \(m)") - } - guard let cloneS1 = clone["s1"] else { - throw shouldNotHappen("Expect clone to contain \"s1\", got \(clone)") - } - cloneS1.shouldEqual(mS1) - - clone.delta.shouldNotBeNil() - } } diff --git a/Tests/DistributedActorsTests/CRDT/Types/CRDTORSetTests.swift b/Tests/DistributedActorsTests/CRDT/Types/CRDTORSetTests.swift index eba3b15f2..48fa6504a 100644 --- a/Tests/DistributedActorsTests/CRDT/Types/CRDTORSetTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Types/CRDTORSetTests.swift @@ -215,14 +215,4 @@ final class CRDTORSetTests: XCTestCase { s1.reset() s1.isEmpty.shouldBeTrue() } - - func test_clone() throws { - var s = CRDT.ORSet(replicaID: self.replicaA) - s.add(3) - - let clone = s.clone() - clone.replicaID.shouldEqual(s.replicaID) - clone.elements.shouldEqual(s.elements) - clone.delta.shouldNotBeNil() - } } From 8d2ac8832690472d49c87a718f9dfb9e693c8ace Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Wed, 29 Apr 2020 01:27:10 -0700 Subject: [PATCH 4/5] remove defaultValue from proto messages --- Protos/CRDT/CRDT.proto | 12 ++-- .../CRDT/Protobuf/CRDT+Serialization.swift | 24 +------ .../CRDT/Protobuf/CRDT.pb.swift | 64 +++++-------------- .../CRDT/Types/CRDT+ORMap.swift | 35 ++++++++-- .../CRDT/Types/CRDT+StateBased.swift | 2 +- .../Protobuf/CRDT+SerializationTests.swift | 2 + 6 files changed, 53 insertions(+), 86 deletions(-) diff --git a/Protos/CRDT/CRDT.proto b/Protos/CRDT/CRDT.proto index 383be9401..20a0ea1d9 100644 --- a/Protos/CRDT/CRDT.proto +++ b/Protos/CRDT/CRDT.proto @@ -96,17 +96,15 @@ message CRDTORMapKeyValue { message CRDTORMap { message Delta { - CRDTORMapValue defaultValue = 1; - CRDTVersionedContainerDelta keys = 2; - repeated CRDTORMapKeyValue values = 3; + CRDTVersionedContainerDelta keys = 1; + repeated CRDTORMapKeyValue values = 2; } VersionReplicaID replicaID = 1; - CRDTORMapValue defaultValue = 2; - CRDTORSet keys = 3; - repeated CRDTORMapKeyValue values = 4; + CRDTORSet keys = 2; + repeated CRDTORMapKeyValue values = 3; // Delta is derived from `updatedValues` - repeated CRDTORMapKeyValue updatedValues = 5; + repeated CRDTORMapKeyValue updatedValues = 4; } // ***** CRDT.LWWRegister ***** diff --git a/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift b/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift index 5fbbf70e4..a15a1fd69 100644 --- a/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift +++ b/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift @@ -310,15 +310,9 @@ extension CRDT.ORMap: ProtobufRepresentable { public func toProto(context: Serialization.Context) throws -> ProtobufRepresentation { var proto = ProtobufRepresentation() proto.replicaID = try self.replicaID.toProto(context: context) - - let serializedDefaultValue = try context.serialization.serialize(self.defaultValue) - proto.defaultValue.manifest = try serializedDefaultValue.manifest.toProto(context: context) - proto.defaultValue.payload = serializedDefaultValue.buffer.readData() - proto.keys = try self._keys.toProto(context: context) proto.values = try ORMapSerializationUtils.valuesToProto(self._values, context: context) proto.updatedValues = try ORMapSerializationUtils.valuesToProto(self.updatedValues, context: context) - return proto } @@ -328,11 +322,6 @@ extension CRDT.ORMap: ProtobufRepresentable { } self.replicaID = try ReplicaID(fromProto: proto.replicaID, context: context) - guard proto.hasDefaultValue else { - throw SerializationError.missingField("defaultValue", type: String(describing: CRDT.ORMap.self)) - } - self.defaultValue = try ORMapSerializationUtils.valueFromProto(proto.defaultValue, context: context) - guard proto.hasKeys else { throw SerializationError.missingField("keys", type: String(describing: CRDT.ORMap.self)) } @@ -340,6 +329,7 @@ extension CRDT.ORMap: ProtobufRepresentable { self._values = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context) self.updatedValues = try ORMapSerializationUtils.valuesFromProto(proto.updatedValues, context: context) + self.defaultValue = nil // We don't need remote's default value for merge } } @@ -348,29 +338,19 @@ extension CRDT.ORMapDelta: ProtobufRepresentable { public func toProto(context: Serialization.Context) throws -> ProtobufRepresentation { var proto = ProtobufRepresentation() - - let serializedDefaultValue = try context.serialization.serialize(self.defaultValue) - proto.defaultValue.manifest = try serializedDefaultValue.manifest.toProto(context: context) - proto.defaultValue.payload = serializedDefaultValue.buffer.readData() - proto.keys = try self.keys.toProto(context: context) proto.values = try ORMapSerializationUtils.valuesToProto(self.values, context: context) - return proto } public init(fromProto proto: ProtobufRepresentation, context: Serialization.Context) throws { - guard proto.hasDefaultValue else { - throw SerializationError.missingField("defaultValue", type: String(describing: CRDT.ORMapDelta.self)) - } - self.defaultValue = try ORMapSerializationUtils.valueFromProto(proto.defaultValue, context: context) - guard proto.hasKeys else { throw SerializationError.missingField("keys", type: String(describing: CRDT.ORMapDelta.self)) } self.keys = try CRDT.ORSet.Delta(fromProto: proto.keys, context: context) self.values = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context) + self.defaultValue = nil // We don't need remote's default value for merge } } diff --git a/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift b/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift index 6afdcbbf2..2596cec17 100644 --- a/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift +++ b/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift @@ -342,15 +342,6 @@ public struct ProtoCRDTORMap { /// Clears the value of `replicaID`. Subsequent reads from it will return its default value. public mutating func clearReplicaID() {_uniqueStorage()._replicaID = nil} - public var defaultValue: ProtoCRDTORMapValue { - get {return _storage._defaultValue ?? ProtoCRDTORMapValue()} - set {_uniqueStorage()._defaultValue = newValue} - } - /// Returns true if `defaultValue` has been explicitly set. - public var hasDefaultValue: Bool {return _storage._defaultValue != nil} - /// Clears the value of `defaultValue`. Subsequent reads from it will return its default value. - public mutating func clearDefaultValue() {_uniqueStorage()._defaultValue = nil} - public var keys: ProtoCRDTORSet { get {return _storage._keys ?? ProtoCRDTORSet()} set {_uniqueStorage()._keys = newValue} @@ -378,15 +369,6 @@ public struct ProtoCRDTORMap { // `Message` and `Message+*Additions` files in the SwiftProtobuf library for // methods supported on all messages. - public var defaultValue: ProtoCRDTORMapValue { - get {return _storage._defaultValue ?? ProtoCRDTORMapValue()} - set {_uniqueStorage()._defaultValue = newValue} - } - /// Returns true if `defaultValue` has been explicitly set. - public var hasDefaultValue: Bool {return _storage._defaultValue != nil} - /// Clears the value of `defaultValue`. Subsequent reads from it will return its default value. - public mutating func clearDefaultValue() {_uniqueStorage()._defaultValue = nil} - public var keys: ProtoCRDTVersionedContainerDelta { get {return _storage._keys ?? ProtoCRDTVersionedContainerDelta()} set {_uniqueStorage()._keys = newValue} @@ -1231,15 +1213,13 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement public static let protoMessageName: String = "CRDTORMap" public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ 1: .same(proto: "replicaID"), - 2: .same(proto: "defaultValue"), - 3: .same(proto: "keys"), - 4: .same(proto: "values"), - 5: .same(proto: "updatedValues"), + 2: .same(proto: "keys"), + 3: .same(proto: "values"), + 4: .same(proto: "updatedValues"), ] fileprivate class _StorageClass { var _replicaID: ProtoVersionReplicaID? = nil - var _defaultValue: ProtoCRDTORMapValue? = nil var _keys: ProtoCRDTORSet? = nil var _values: [ProtoCRDTORMapKeyValue] = [] var _updatedValues: [ProtoCRDTORMapKeyValue] = [] @@ -1250,7 +1230,6 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement init(copying source: _StorageClass) { _replicaID = source._replicaID - _defaultValue = source._defaultValue _keys = source._keys _values = source._values _updatedValues = source._updatedValues @@ -1270,10 +1249,9 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement while let fieldNumber = try decoder.nextFieldNumber() { switch fieldNumber { case 1: try decoder.decodeSingularMessageField(value: &_storage._replicaID) - case 2: try decoder.decodeSingularMessageField(value: &_storage._defaultValue) - case 3: try decoder.decodeSingularMessageField(value: &_storage._keys) - case 4: try decoder.decodeRepeatedMessageField(value: &_storage._values) - case 5: try decoder.decodeRepeatedMessageField(value: &_storage._updatedValues) + case 2: try decoder.decodeSingularMessageField(value: &_storage._keys) + case 3: try decoder.decodeRepeatedMessageField(value: &_storage._values) + case 4: try decoder.decodeRepeatedMessageField(value: &_storage._updatedValues) default: break } } @@ -1285,17 +1263,14 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement if let v = _storage._replicaID { try visitor.visitSingularMessageField(value: v, fieldNumber: 1) } - if let v = _storage._defaultValue { - try visitor.visitSingularMessageField(value: v, fieldNumber: 2) - } if let v = _storage._keys { - try visitor.visitSingularMessageField(value: v, fieldNumber: 3) + try visitor.visitSingularMessageField(value: v, fieldNumber: 2) } if !_storage._values.isEmpty { - try visitor.visitRepeatedMessageField(value: _storage._values, fieldNumber: 4) + try visitor.visitRepeatedMessageField(value: _storage._values, fieldNumber: 3) } if !_storage._updatedValues.isEmpty { - try visitor.visitRepeatedMessageField(value: _storage._updatedValues, fieldNumber: 5) + try visitor.visitRepeatedMessageField(value: _storage._updatedValues, fieldNumber: 4) } } try unknownFields.traverse(visitor: &visitor) @@ -1307,7 +1282,6 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement let _storage = _args.0 let rhs_storage = _args.1 if _storage._replicaID != rhs_storage._replicaID {return false} - if _storage._defaultValue != rhs_storage._defaultValue {return false} if _storage._keys != rhs_storage._keys {return false} if _storage._values != rhs_storage._values {return false} if _storage._updatedValues != rhs_storage._updatedValues {return false} @@ -1323,13 +1297,11 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { public static let protoMessageName: String = ProtoCRDTORMap.protoMessageName + ".Delta" public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ - 1: .same(proto: "defaultValue"), - 2: .same(proto: "keys"), - 3: .same(proto: "values"), + 1: .same(proto: "keys"), + 2: .same(proto: "values"), ] fileprivate class _StorageClass { - var _defaultValue: ProtoCRDTORMapValue? = nil var _keys: ProtoCRDTVersionedContainerDelta? = nil var _values: [ProtoCRDTORMapKeyValue] = [] @@ -1338,7 +1310,6 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp private init() {} init(copying source: _StorageClass) { - _defaultValue = source._defaultValue _keys = source._keys _values = source._values } @@ -1356,9 +1327,8 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp try withExtendedLifetime(_storage) { (_storage: _StorageClass) in while let fieldNumber = try decoder.nextFieldNumber() { switch fieldNumber { - case 1: try decoder.decodeSingularMessageField(value: &_storage._defaultValue) - case 2: try decoder.decodeSingularMessageField(value: &_storage._keys) - case 3: try decoder.decodeRepeatedMessageField(value: &_storage._values) + case 1: try decoder.decodeSingularMessageField(value: &_storage._keys) + case 2: try decoder.decodeRepeatedMessageField(value: &_storage._values) default: break } } @@ -1367,14 +1337,11 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp public func traverse(visitor: inout V) throws { try withExtendedLifetime(_storage) { (_storage: _StorageClass) in - if let v = _storage._defaultValue { - try visitor.visitSingularMessageField(value: v, fieldNumber: 1) - } if let v = _storage._keys { - try visitor.visitSingularMessageField(value: v, fieldNumber: 2) + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) } if !_storage._values.isEmpty { - try visitor.visitRepeatedMessageField(value: _storage._values, fieldNumber: 3) + try visitor.visitRepeatedMessageField(value: _storage._values, fieldNumber: 2) } } try unknownFields.traverse(visitor: &visitor) @@ -1385,7 +1352,6 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in let _storage = _args.0 let rhs_storage = _args.1 - if _storage._defaultValue != rhs_storage._defaultValue {return false} if _storage._keys != rhs_storage._keys {return false} if _storage._values != rhs_storage._values {return false} return true diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift b/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift index f3ffdf117..14b778054 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift @@ -56,7 +56,11 @@ extension CRDT { public let replicaID: ReplicaID - let defaultValue: Value + /// We allow `defaultValue` to be `nil` when we reconstruct `ORMap` from a remote message, + /// but it **is** required in the initializer to ensure that **local** `ORMap` has `defaultValue` + /// for `merge`, `update`, etc. In those methods `defaultValue` is required in case **local** + /// `ORMap` does not have an existing value for the given `key`. + let defaultValue: Value? /// ORSet to maintain causal history of the keys only; values keep their own causal history (if applicable). /// This is for tracking key additions and removals. @@ -104,11 +108,15 @@ extension CRDT { } public mutating func update(key: Key, mutator: (inout Value) -> Void) { + guard let defaultValue = self.defaultValue else { + preconditionFailure("'defaultValue' is not set. This is a bug. Please report.") + } + // Always add `key` to `_keys` set to track its causal history self._keys.add(key) // Apply `mutator` to the value then save it to state. Create `Value` if needed. - var value = self._values[key] ?? self.defaultValue + var value = self._values[key] ?? defaultValue mutator(&value) self._values[key] = value @@ -151,17 +159,25 @@ extension CRDT { } public mutating func merge(other: ORMap) { + guard let defaultValue = self.defaultValue else { + preconditionFailure("'defaultValue' is not set. This is a bug. Please report.") + } + self._keys.merge(other: other._keys) // Use the updated `_keys` to merge `_values` dictionaries. // Keys that no longer exist will have their values deleted as well. - self._values.merge(keys: self._keys.elements, other: other._values, defaultValue: self.defaultValue) + self._values.merge(keys: self._keys.elements, other: other._values, defaultValue: defaultValue) } public mutating func mergeDelta(_ delta: Delta) { + guard let defaultValue = self.defaultValue else { + preconditionFailure("'defaultValue' is not set. This is a bug. Please report.") + } + self._keys.mergeDelta(delta.keys) // Use the updated `_keys` to merge `_values` dictionaries. // Keys that no longer exist will have their values deleted as well. - self._values.merge(keys: self._keys.elements, other: delta.values, defaultValue: self.defaultValue) + self._values.merge(keys: self._keys.elements, other: delta.values, defaultValue: defaultValue) } public mutating func resetDelta() { @@ -176,9 +192,10 @@ extension CRDT { // TODO: `merge` defined in the Dictionary extension below should use `mergeDelta` when Value is DeltaCRDT var values: [Key: Value] - let defaultValue: Value + // See comment in `ORMap` on why this is optional + let defaultValue: Value? - init(keys: ORSet.Delta, values: [Key: Value], defaultValue: Value) { + init(keys: ORSet.Delta, values: [Key: Value], defaultValue: Value?) { self.keys = keys self.values = values self.defaultValue = defaultValue @@ -195,11 +212,15 @@ extension CRDT { } public mutating func merge(other: ORMapDelta) { + guard let defaultValue = self.defaultValue else { + preconditionFailure("'defaultValue' is not set. This is a bug. Please report.") + } + // Merge `keys` first--keys that have been deleted will be gone self.keys.merge(other: other.keys) // Use the updated `keys` to merge `values` dictionaries. // Keys that no longer exist will have their values deleted as well. - self.values.merge(keys: self.keys.elements, other: other.values, defaultValue: self.defaultValue) + self.values.merge(keys: self.keys.elements, other: other.values, defaultValue: defaultValue) } } } diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift b/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift index a9e88cdce..e85f3c756 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+StateBased.swift @@ -20,7 +20,7 @@ import NIO /// Top-level protocol for any kind of state based CRDT (also known as `CvRDT`). /// -/// ** Note: ** CRDT **must** have value semantics. Assumptions have been made in code with this being true, +/// - Warning: CRDTs MUST have value semantics. Assumptions have been made in code with this being true, /// and there might be undesirable consequences otherwise. public protocol StateBasedCRDT: Codable { /// Attempts to merge the state of the given data type instance into this data type instance. diff --git a/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift b/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift index e84225e34..ee432eb71 100644 --- a/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift @@ -228,6 +228,7 @@ final class CRDTSerializationTests: ActorSystemTestBase { let deserialized = try system.serialization.deserialize(as: CRDT.ORMap>.self, from: serialized) "\(deserialized.replicaID)".shouldContain("actor:sact://CRDTSerializationTests@localhost:9001/user/alpha") + deserialized.defaultValue.shouldBeNil() deserialized._keys.elements.shouldEqual(["s1", "s2"]) deserialized._values.count.shouldEqual(2) @@ -262,6 +263,7 @@ final class CRDTSerializationTests: ActorSystemTestBase { let serialized = try system.serialization.serialize(map.delta!) // !-safe, must have a delta, we just checked it let deserialized = try system.serialization.deserialize(as: CRDT.ORMap>.Delta.self, from: serialized) + deserialized.defaultValue.shouldBeNil() deserialized.keys.elementByBirthDot.count.shouldEqual(map.delta!.keys.elementByBirthDot.count) deserialized.values.count.shouldEqual(2) From d74ecddea84ae33fd1a8a5e3ec3cd5cfd0b599e5 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Thu, 30 Apr 2020 11:25:58 +0900 Subject: [PATCH 5/5] Apply suggestions from code review --- Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift b/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift index 14b778054..d9cebfcd5 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift @@ -213,7 +213,7 @@ extension CRDT { public mutating func merge(other: ORMapDelta) { guard let defaultValue = self.defaultValue else { - preconditionFailure("'defaultValue' is not set. This is a bug. Please report.") + preconditionFailure("Unable to merge [\(self)] with [\(other)] as 'defaultValue' is not set. This is a bug. Please report.") } // Merge `keys` first--keys that have been deleted will be gone