From 155b6ec04ae4792e73e22fb59e758d95f79571df Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Thu, 30 Apr 2020 16:05:07 -0700 Subject: [PATCH] CRDT.ORMultiMap and LWWMap serialization Motivation: Make all CRDTs serializable. See https://github.com/apple/swift-distributed-actors/issues/507. Modifications: - Add `ProtobufRepresentable` conformance to `CRDT.ORMultiMap`. - Add `ProtobufRepresentable` conformance to `CRDT.LWWMap`. Result: Resolves https://github.com/apple/swift-distributed-actors/issues/507 --- Protos/CRDT/CRDT.proto | 16 ++ .../CRDT/Protobuf/CRDT+Serialization.swift | 44 ++++ .../CRDT/Protobuf/CRDT.pb.swift | 200 ++++++++++++++++++ .../Serialization+SerializerID.swift | 4 + .../Protobuf/CRDT+SerializationTests.swift | 127 +++++++++++ 5 files changed, 391 insertions(+) diff --git a/Protos/CRDT/CRDT.proto b/Protos/CRDT/CRDT.proto index 20a0ea1d9..700b2da54 100644 --- a/Protos/CRDT/CRDT.proto +++ b/Protos/CRDT/CRDT.proto @@ -107,6 +107,22 @@ message CRDTORMap { repeated CRDTORMapKeyValue updatedValues = 4; } +// ***** CRDT.ORMultiMap ***** + +message CRDTORMultiMap { + VersionReplicaID replicaID = 1; + // Includes delta + CRDTORMap state = 2; +} + +// ***** CRDT.LWWMap ***** + +message CRDTLWWMap { + VersionReplicaID replicaID = 1; + // Includes delta + CRDTORMap state = 2; +} + // ***** CRDT.LWWRegister ***** message CRDTLWWRegister { diff --git a/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift b/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift index a15a1fd69..e2c44fa86 100644 --- a/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift +++ b/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift @@ -354,6 +354,50 @@ extension CRDT.ORMapDelta: ProtobufRepresentable { } } +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: CRDT.ORMultiMap + +extension CRDT.ORMultiMap: ProtobufRepresentable { + public typealias ProtobufRepresentation = ProtoCRDTORMultiMap + + public func toProto(context: Serialization.Context) throws -> ProtobufRepresentation { + var proto = ProtobufRepresentation() + proto.replicaID = try self.replicaID.toProto(context: context) + proto.state = try self.state.toProto(context: context) + return proto + } + + public init(fromProto proto: ProtobufRepresentation, context: Serialization.Context) throws { + guard proto.hasReplicaID else { + throw SerializationError.missingField("replicaID", type: String(describing: CRDT.ORMultiMap.self)) + } + self.replicaID = try ReplicaID(fromProto: proto.replicaID, context: context) + self.state = try CRDT.ORMap>(fromProto: proto.state, context: context) + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: CRDT.LWWMap + +extension CRDT.LWWMap: ProtobufRepresentable { + public typealias ProtobufRepresentation = ProtoCRDTLWWMap + + public func toProto(context: Serialization.Context) throws -> ProtobufRepresentation { + var proto = ProtobufRepresentation() + proto.replicaID = try self.replicaID.toProto(context: context) + proto.state = try self.state.toProto(context: context) + return proto + } + + public init(fromProto proto: ProtobufRepresentation, context: Serialization.Context) throws { + guard proto.hasReplicaID else { + throw SerializationError.missingField("replicaID", type: String(describing: CRDT.LWWMap.self)) + } + self.replicaID = try ReplicaID(fromProto: proto.replicaID, context: context) + self.state = try CRDT.ORMap>(fromProto: proto.state, context: context) + } +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: CRDT.LWWRegister diff --git a/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift b/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift index 2596cec17..e549d2793 100644 --- a/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift +++ b/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift @@ -395,6 +395,68 @@ public struct ProtoCRDTORMap { fileprivate var _storage = _StorageClass.defaultInstance } +public struct ProtoCRDTORMultiMap { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + public var replicaID: ProtoVersionReplicaID { + get {return _storage._replicaID ?? ProtoVersionReplicaID()} + set {_uniqueStorage()._replicaID = newValue} + } + /// Returns true if `replicaID` has been explicitly set. + public var hasReplicaID: Bool {return _storage._replicaID != nil} + /// Clears the value of `replicaID`. Subsequent reads from it will return its default value. + public mutating func clearReplicaID() {_uniqueStorage()._replicaID = nil} + + /// Includes delta + public var state: ProtoCRDTORMap { + get {return _storage._state ?? ProtoCRDTORMap()} + set {_uniqueStorage()._state = newValue} + } + /// Returns true if `state` has been explicitly set. + public var hasState: Bool {return _storage._state != nil} + /// Clears the value of `state`. Subsequent reads from it will return its default value. + public mutating func clearState() {_uniqueStorage()._state = nil} + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + + fileprivate var _storage = _StorageClass.defaultInstance +} + +public struct ProtoCRDTLWWMap { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + public var replicaID: ProtoVersionReplicaID { + get {return _storage._replicaID ?? ProtoVersionReplicaID()} + set {_uniqueStorage()._replicaID = newValue} + } + /// Returns true if `replicaID` has been explicitly set. + public var hasReplicaID: Bool {return _storage._replicaID != nil} + /// Clears the value of `replicaID`. Subsequent reads from it will return its default value. + public mutating func clearReplicaID() {_uniqueStorage()._replicaID = nil} + + /// Includes delta + public var state: ProtoCRDTORMap { + get {return _storage._state ?? ProtoCRDTORMap()} + set {_uniqueStorage()._state = newValue} + } + /// Returns true if `state` has been explicitly set. + public var hasState: Bool {return _storage._state != nil} + /// Clears the value of `state`. Subsequent reads from it will return its default value. + public mutating func clearState() {_uniqueStorage()._state = nil} + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + + fileprivate var _storage = _StorageClass.defaultInstance +} + public struct ProtoCRDTLWWRegister { // SwiftProtobuf.Message conformance is added in an extension below. See the // `Message` and `Message+*Additions` files in the SwiftProtobuf library for @@ -1363,6 +1425,144 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp } } +extension ProtoCRDTORMultiMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = "CRDTORMultiMap" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "replicaID"), + 2: .same(proto: "state"), + ] + + fileprivate class _StorageClass { + var _replicaID: ProtoVersionReplicaID? = nil + var _state: ProtoCRDTORMap? = nil + + static let defaultInstance = _StorageClass() + + private init() {} + + init(copying source: _StorageClass) { + _replicaID = source._replicaID + _state = source._state + } + } + + fileprivate mutating func _uniqueStorage() -> _StorageClass { + if !isKnownUniquelyReferenced(&_storage) { + _storage = _StorageClass(copying: _storage) + } + return _storage + } + + public mutating func decodeMessage(decoder: inout D) throws { + _ = _uniqueStorage() + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + while let fieldNumber = try decoder.nextFieldNumber() { + switch fieldNumber { + case 1: try decoder.decodeSingularMessageField(value: &_storage._replicaID) + case 2: try decoder.decodeSingularMessageField(value: &_storage._state) + default: break + } + } + } + } + + public func traverse(visitor: inout V) throws { + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + if let v = _storage._replicaID { + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + } + if let v = _storage._state { + try visitor.visitSingularMessageField(value: v, fieldNumber: 2) + } + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: ProtoCRDTORMultiMap, rhs: ProtoCRDTORMultiMap) -> Bool { + if lhs._storage !== rhs._storage { + let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in + let _storage = _args.0 + let rhs_storage = _args.1 + if _storage._replicaID != rhs_storage._replicaID {return false} + if _storage._state != rhs_storage._state {return false} + return true + } + if !storagesAreEqual {return false} + } + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension ProtoCRDTLWWMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = "CRDTLWWMap" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "replicaID"), + 2: .same(proto: "state"), + ] + + fileprivate class _StorageClass { + var _replicaID: ProtoVersionReplicaID? = nil + var _state: ProtoCRDTORMap? = nil + + static let defaultInstance = _StorageClass() + + private init() {} + + init(copying source: _StorageClass) { + _replicaID = source._replicaID + _state = source._state + } + } + + fileprivate mutating func _uniqueStorage() -> _StorageClass { + if !isKnownUniquelyReferenced(&_storage) { + _storage = _StorageClass(copying: _storage) + } + return _storage + } + + public mutating func decodeMessage(decoder: inout D) throws { + _ = _uniqueStorage() + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + while let fieldNumber = try decoder.nextFieldNumber() { + switch fieldNumber { + case 1: try decoder.decodeSingularMessageField(value: &_storage._replicaID) + case 2: try decoder.decodeSingularMessageField(value: &_storage._state) + default: break + } + } + } + } + + public func traverse(visitor: inout V) throws { + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + if let v = _storage._replicaID { + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + } + if let v = _storage._state { + try visitor.visitSingularMessageField(value: v, fieldNumber: 2) + } + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: ProtoCRDTLWWMap, rhs: ProtoCRDTLWWMap) -> Bool { + if lhs._storage !== rhs._storage { + let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in + let _storage = _args.0 + let rhs_storage = _args.1 + if _storage._replicaID != rhs_storage._replicaID {return false} + if _storage._state != rhs_storage._state {return false} + return true + } + if !storagesAreEqual {return false} + } + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + extension ProtoCRDTLWWRegister: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { public static let protoMessageName: String = "CRDTLWWRegister" public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ diff --git a/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift b/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift index fa40dab91..720edbee2 100644 --- a/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift +++ b/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift @@ -116,6 +116,10 @@ extension Serialization { internal static let CRDTORSetDelta: SerializerID = .protobufRepresentable internal static let CRDTORMap: SerializerID = .protobufRepresentable internal static let CRDTORMapDelta: SerializerID = .protobufRepresentable + internal static let CRDTORMultiMap: SerializerID = .protobufRepresentable + internal static let CRDTORMultiMapDelta: SerializerID = .protobufRepresentable + internal static let CRDTLWWMap: SerializerID = .protobufRepresentable + internal static let CRDTLWWMapDelta: SerializerID = .protobufRepresentable internal static let CRDTLWWRegister: SerializerID = .protobufRepresentable internal static let ConvergentGossipMembership: SerializerID = .foundationJSON diff --git a/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift b/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift index ee432eb71..e210a1ca2 100644 --- a/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift @@ -27,7 +27,12 @@ final class CRDTSerializationTests: ActorSystemTestBase { settings.serialization.register(CRDT.ORSet.Delta.self, serializerID: Serialization.ReservedID.CRDTORSetDelta) settings.serialization.register(CRDT.ORMap>.self, serializerID: Serialization.ReservedID.CRDTORMap) settings.serialization.register(CRDT.ORMap>.Delta.self, serializerID: Serialization.ReservedID.CRDTORMapDelta) + settings.serialization.register(CRDT.ORMultiMap.self, serializerID: Serialization.ReservedID.CRDTORMultiMap) + settings.serialization.register(CRDT.ORMultiMap.Delta.self, serializerID: Serialization.ReservedID.CRDTORMultiMapDelta) + settings.serialization.register(CRDT.LWWMap.self, serializerID: Serialization.ReservedID.CRDTLWWMap) + settings.serialization.register(CRDT.LWWMap.Delta.self, serializerID: Serialization.ReservedID.CRDTLWWMapDelta) settings.serialization.register(CRDT.LWWRegister.self, serializerID: Serialization.ReservedID.CRDTLWWRegister) + settings.serialization.register(CRDT.LWWRegister.self, serializerID: Serialization.ReservedID.CRDTLWWRegister) } } @@ -280,6 +285,128 @@ final class CRDTSerializationTests: ActorSystemTestBase { } } + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: ORMultiMap + + func test_serializationOf_ORMultiMap() throws { + try shouldNotThrow { + var map = CRDT.ORMultiMap(replicaID: .actorAddress(self.ownerAlpha)) + map.add(forKey: "s1", "a") + map.add(forKey: "s2", "b") + map.add(forKey: "s1", "c") + map.delta.shouldNotBeNil() + + let serialized = try system.serialization.serialize(map) + let deserialized = try system.serialization.deserialize(as: CRDT.ORMultiMap.self, from: serialized) + + "\(deserialized.replicaID)".shouldContain("actor:sact://CRDTSerializationTests@localhost:9001/user/alpha") + deserialized.state._keys.elements.shouldEqual(["s1", "s2"]) + deserialized.state._values.count.shouldEqual(2) + + guard let s1 = deserialized["s1"] else { + throw shouldNotHappen("Expect deserialized to contain \"s1\", got \(deserialized)") + } + s1.shouldEqual(["a", "c"]) + + guard let s2 = deserialized["s2"] else { + throw shouldNotHappen("Expect deserialized to contain \"s2\", got \(deserialized)") + } + s2.shouldEqual(["b"]) + + deserialized.delta.shouldNotBeNil() + deserialized.delta!.keys.elementByBirthDot.count.shouldEqual(map.delta!.keys.elementByBirthDot.count) // same elements added to delta + deserialized.delta!.values.count.shouldEqual(2) + } + } + + func test_serializationOf_ORMultiMap_delta() throws { + try shouldNotThrow { + var map = CRDT.ORMultiMap(replicaID: .actorAddress(self.ownerAlpha)) + map.add(forKey: "s1", "a") + map.add(forKey: "s2", "b") + map.add(forKey: "s1", "c") + map.delta.shouldNotBeNil() + + let serialized = try system.serialization.serialize(map.delta!) // !-safe, must have a delta, we just checked it + let deserialized = try system.serialization.deserialize(as: CRDT.ORMultiMap.Delta.self, from: serialized) + + // Delta is just `ORMapDelta` + deserialized.keys.elementByBirthDot.count.shouldEqual(map.delta!.keys.elementByBirthDot.count) + deserialized.values.count.shouldEqual(2) + + // delta contains the same elements as map + guard let s1 = deserialized.values["s1"] else { + throw shouldNotHappen("Expect deserialized to contain \"s1\", got \(deserialized)") + } + s1.elements.shouldEqual(["a", "c"]) + + guard let s2 = deserialized.values["s2"] else { + throw shouldNotHappen("Expect deserialized to contain \"s2\", got \(deserialized)") + } + s2.elements.shouldEqual(["b"]) + } + } + + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: LWWMap + + func test_serializationOf_LWWMap() throws { + try shouldNotThrow { + var map = CRDT.LWWMap(replicaID: .actorAddress(self.ownerAlpha), defaultValue: "") + map.set(forKey: "foo", value: "a") + map.set(forKey: "bar", value: "b") + map.delta.shouldNotBeNil() + + let serialized = try system.serialization.serialize(map) + let deserialized = try system.serialization.deserialize(as: CRDT.LWWMap.self, from: serialized) + + "\(deserialized.replicaID)".shouldContain("actor:sact://CRDTSerializationTests@localhost:9001/user/alpha") + deserialized.state._keys.elements.shouldEqual(["foo", "bar"]) + deserialized.state._values.count.shouldEqual(2) + + guard let foo = deserialized["foo"] else { + throw shouldNotHappen("Expect deserialized to contain \"foo\", got \(deserialized)") + } + foo.shouldEqual("a") + + guard let bar = deserialized["bar"] else { + throw shouldNotHappen("Expect deserialized to contain \"bar\", got \(deserialized)") + } + bar.shouldEqual("b") + + deserialized.delta.shouldNotBeNil() + deserialized.delta!.keys.elementByBirthDot.count.shouldEqual(map.delta!.keys.elementByBirthDot.count) // same elements added to delta + deserialized.delta!.values.count.shouldEqual(2) + } + } + + func test_serializationOf_LWWMap_delta() throws { + try shouldNotThrow { + var map = CRDT.LWWMap(replicaID: .actorAddress(self.ownerAlpha), defaultValue: "") + map.set(forKey: "foo", value: "a") + map.set(forKey: "bar", value: "b") + map.delta.shouldNotBeNil() + + let serialized = try system.serialization.serialize(map.delta!) // !-safe, must have a delta, we just checked it + let deserialized = try system.serialization.deserialize(as: CRDT.LWWMap.Delta.self, from: serialized) + + // Delta is just `ORMapDelta` + deserialized.keys.elementByBirthDot.count.shouldEqual(map.delta!.keys.elementByBirthDot.count) + deserialized.values.count.shouldEqual(2) + + // delta contains the same elements as map + guard let foo = deserialized.values["foo"] else { + throw shouldNotHappen("Expect deserialized to contain \"foo\", got \(deserialized)") + } + foo.value.shouldEqual("a") + + guard let bar = deserialized.values["bar"] else { + throw shouldNotHappen("Expect deserialized to contain \"bar\", got \(deserialized)") + } + bar.value.shouldEqual("b") + } + } + // ==== ------------------------------------------------------------------------------------------------------------ // MARK: LWWRegister