diff --git a/.gitignore b/.gitignore index eee918ebf..a405cf5b8 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ Samples/swift-distributed-actors-samples.xcodeproj /.build /Samples/.build +Instruments/ActorInstruments/build/ /.SourceKitten /Packages .xcode diff --git a/Protos/CRDT/CRDT.proto b/Protos/CRDT/CRDT.proto index 700b2da54..e013ab872 100644 --- a/Protos/CRDT/CRDT.proto +++ b/Protos/CRDT/CRDT.proto @@ -98,6 +98,7 @@ message CRDTORMap { message Delta { CRDTVersionedContainerDelta keys = 1; repeated CRDTORMapKeyValue values = 2; + CRDTORMapValue defaultValue = 3; } VersionReplicaID replicaID = 1; @@ -105,6 +106,7 @@ message CRDTORMap { repeated CRDTORMapKeyValue values = 3; // Delta is derived from `updatedValues` repeated CRDTORMapKeyValue updatedValues = 4; + CRDTORMapValue defaultValue = 5; } // ***** CRDT.ORMultiMap ***** diff --git a/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift b/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift index b525cf552..6c191d3e5 100644 --- a/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift +++ b/Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift @@ -319,6 +319,7 @@ extension CRDT.ORMap: ProtobufRepresentable { proto.keys = try self._keys.toProto(context: context) proto.values = try ORMapSerializationUtils.valuesToProto(self._storage, context: context) proto.updatedValues = try ORMapSerializationUtils.valuesToProto(self.updatedValues, context: context) + proto.defaultValue = try ORMapSerializationUtils.valueToProto(self.defaultValue, context: context) return proto } @@ -335,7 +336,7 @@ extension CRDT.ORMap: ProtobufRepresentable { self._storage = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context) self.updatedValues = try ORMapSerializationUtils.valuesFromProto(proto.updatedValues, context: context) - self.defaultValue = nil // We don't need remote's default value for merge + self.defaultValue = try ORMapSerializationUtils.valueFromProto(proto.defaultValue, context: context) } } @@ -346,6 +347,7 @@ extension CRDT.ORMapDelta: ProtobufRepresentable { var proto = ProtobufRepresentation() proto.keys = try self.keys.toProto(context: context) proto.values = try ORMapSerializationUtils.valuesToProto(self.values, context: context) + proto.defaultValue = try ORMapSerializationUtils.valueToProto(self.defaultValue, context: context) return proto } @@ -356,7 +358,7 @@ extension CRDT.ORMapDelta: ProtobufRepresentable { self.keys = try CRDT.ORSet.Delta(fromProto: proto.keys, context: context) self.values = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context) - self.defaultValue = nil // We don't need remote's default value for merge + self.defaultValue = try ORMapSerializationUtils.valueFromProto(proto.defaultValue, context: context) } } diff --git a/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift b/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift index e549d2793..e34be898e 100644 --- a/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift +++ b/Sources/DistributedActors/CRDT/Protobuf/CRDT.pb.swift @@ -362,6 +362,15 @@ public struct ProtoCRDTORMap { set {_uniqueStorage()._updatedValues = newValue} } + public var defaultValue: ProtoCRDTORMapValue { + get {return _storage._defaultValue ?? ProtoCRDTORMapValue()} + set {_uniqueStorage()._defaultValue = newValue} + } + /// Returns true if `defaultValue` has been explicitly set. + public var hasDefaultValue: Bool {return _storage._defaultValue != nil} + /// Clears the value of `defaultValue`. Subsequent reads from it will return its default value. + public mutating func clearDefaultValue() {_uniqueStorage()._defaultValue = nil} + public var unknownFields = SwiftProtobuf.UnknownStorage() public struct Delta { @@ -383,6 +392,15 @@ public struct ProtoCRDTORMap { set {_uniqueStorage()._values = newValue} } + public var defaultValue: ProtoCRDTORMapValue { + get {return _storage._defaultValue ?? ProtoCRDTORMapValue()} + set {_uniqueStorage()._defaultValue = newValue} + } + /// Returns true if `defaultValue` has been explicitly set. + public var hasDefaultValue: Bool {return _storage._defaultValue != nil} + /// Clears the value of `defaultValue`. Subsequent reads from it will return its default value. + public mutating func clearDefaultValue() {_uniqueStorage()._defaultValue = nil} + public var unknownFields = SwiftProtobuf.UnknownStorage() public init() {} @@ -1278,6 +1296,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement 2: .same(proto: "keys"), 3: .same(proto: "values"), 4: .same(proto: "updatedValues"), + 5: .same(proto: "defaultValue"), ] fileprivate class _StorageClass { @@ -1285,6 +1304,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement var _keys: ProtoCRDTORSet? = nil var _values: [ProtoCRDTORMapKeyValue] = [] var _updatedValues: [ProtoCRDTORMapKeyValue] = [] + var _defaultValue: ProtoCRDTORMapValue? = nil static let defaultInstance = _StorageClass() @@ -1295,6 +1315,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement _keys = source._keys _values = source._values _updatedValues = source._updatedValues + _defaultValue = source._defaultValue } } @@ -1314,6 +1335,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement case 2: try decoder.decodeSingularMessageField(value: &_storage._keys) case 3: try decoder.decodeRepeatedMessageField(value: &_storage._values) case 4: try decoder.decodeRepeatedMessageField(value: &_storage._updatedValues) + case 5: try decoder.decodeSingularMessageField(value: &_storage._defaultValue) default: break } } @@ -1334,6 +1356,9 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement if !_storage._updatedValues.isEmpty { try visitor.visitRepeatedMessageField(value: _storage._updatedValues, fieldNumber: 4) } + if let v = _storage._defaultValue { + try visitor.visitSingularMessageField(value: v, fieldNumber: 5) + } } try unknownFields.traverse(visitor: &visitor) } @@ -1347,6 +1372,7 @@ extension ProtoCRDTORMap: SwiftProtobuf.Message, SwiftProtobuf._MessageImplement if _storage._keys != rhs_storage._keys {return false} if _storage._values != rhs_storage._values {return false} if _storage._updatedValues != rhs_storage._updatedValues {return false} + if _storage._defaultValue != rhs_storage._defaultValue {return false} return true } if !storagesAreEqual {return false} @@ -1361,11 +1387,13 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ 1: .same(proto: "keys"), 2: .same(proto: "values"), + 3: .same(proto: "defaultValue"), ] fileprivate class _StorageClass { var _keys: ProtoCRDTVersionedContainerDelta? = nil var _values: [ProtoCRDTORMapKeyValue] = [] + var _defaultValue: ProtoCRDTORMapValue? = nil static let defaultInstance = _StorageClass() @@ -1374,6 +1402,7 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp init(copying source: _StorageClass) { _keys = source._keys _values = source._values + _defaultValue = source._defaultValue } } @@ -1391,6 +1420,7 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp switch fieldNumber { case 1: try decoder.decodeSingularMessageField(value: &_storage._keys) case 2: try decoder.decodeRepeatedMessageField(value: &_storage._values) + case 3: try decoder.decodeSingularMessageField(value: &_storage._defaultValue) default: break } } @@ -1405,6 +1435,9 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp if !_storage._values.isEmpty { try visitor.visitRepeatedMessageField(value: _storage._values, fieldNumber: 2) } + if let v = _storage._defaultValue { + try visitor.visitSingularMessageField(value: v, fieldNumber: 3) + } } try unknownFields.traverse(visitor: &visitor) } @@ -1416,6 +1449,7 @@ extension ProtoCRDTORMap.Delta: SwiftProtobuf.Message, SwiftProtobuf._MessageImp let rhs_storage = _args.1 if _storage._keys != rhs_storage._keys {return false} if _storage._values != rhs_storage._values {return false} + if _storage._defaultValue != rhs_storage._defaultValue {return false} return true } if !storagesAreEqual {return false} diff --git a/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift b/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift index 4552f55eb..d04d25113 100644 --- a/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift +++ b/Sources/DistributedActors/CRDT/Types/CRDT+ORMap.swift @@ -56,11 +56,9 @@ extension CRDT { public let replicaID: ReplicaID - /// We allow `defaultValue` to be `nil` when we reconstruct `ORMap` from a remote message, - /// but it **is** required in the initializer to ensure that **local** `ORMap` has `defaultValue` - /// for `merge`, `update`, etc. In those methods `defaultValue` is required in case **local** + /// The default value for `merge`, `update`, etc. in case **local** /// `ORMap` does not have an existing value for the given `key`. - let defaultValue: Value? + let defaultValue: Value /// ORSet to maintain causal history of the keys only; values keep their own causal history (if applicable). /// This is for tracking key additions and removals. @@ -110,15 +108,11 @@ extension CRDT { } public mutating func update(key: Key, mutator: (inout Value) -> Void) { - guard let defaultValue = self.defaultValue else { - preconditionFailure("'defaultValue' is not set. This is a bug. Please report.") - } - // Always add `key` to `_keys` set to track its causal history self._keys.insert(key) // Apply `mutator` to the value then save it to state. Create `Value` if needed. - var value = self._storage[key] ?? defaultValue + var value = self._storage[key] ?? self.defaultValue mutator(&value) self._storage[key] = value @@ -161,25 +155,17 @@ extension CRDT { } public mutating func merge(other: ORMap) { - guard let defaultValue = self.defaultValue else { - preconditionFailure("'defaultValue' is not set. This is a bug. Please report.") - } - self._keys.merge(other: other._keys) // Use the updated `_keys` to merge `_values` dictionaries. // Keys that no longer exist will have their values deleted as well. - self._storage.merge(keys: self._keys.elements, other: other._storage, defaultValue: defaultValue) + self._storage.merge(keys: self._keys.elements, other: other._storage, defaultValue: self.defaultValue) } public mutating func mergeDelta(_ delta: Delta) { - guard let defaultValue = self.defaultValue else { - preconditionFailure("'defaultValue' is not set. This is a bug. Please report.") - } - self._keys.mergeDelta(delta.keys) // Use the updated `_keys` to merge `_values` dictionaries. // Keys that no longer exist will have their values deleted as well. - self._storage.merge(keys: self._keys.elements, other: delta.values, defaultValue: defaultValue) + self._storage.merge(keys: self._keys.elements, other: delta.values, defaultValue: self.defaultValue) } public mutating func resetDelta() { @@ -192,13 +178,8 @@ extension CRDT { return false } - switch (self.defaultValue, other.defaultValue) { - case (nil, nil): - () // continue checking - case (.some(let lhs), .some(let rhs)) where lhs.equalState(to: rhs): - () // continue checking - default: - return false // values not equal + guard self.defaultValue.equalState(to: other.defaultValue) else { + return false } guard self._storage.count == other._storage.count else { @@ -230,10 +211,9 @@ extension CRDT { // TODO: `merge` defined in the Dictionary extension below should use `mergeDelta` when Value is DeltaCRDT var values: [Key: Value] - // See comment in `ORMap` on why this is optional - let defaultValue: Value? + let defaultValue: Value - init(keys: ORSet.Delta, values: [Key: Value], defaultValue: Value?) { + init(keys: ORSet.Delta, values: [Key: Value], defaultValue: Value) { self.keys = keys self.values = values self.defaultValue = defaultValue @@ -250,15 +230,11 @@ extension CRDT { } public mutating func merge(other: ORMapDelta) { - guard let defaultValue = self.defaultValue else { - preconditionFailure("Unable to merge [\(self)] with [\(other)] as 'defaultValue' is not set. This is a bug. Please report.") - } - // Merge `keys` first--keys that have been deleted will be gone self.keys.merge(other: other.keys) // Use the updated `keys` to merge `values` dictionaries. // Keys that no longer exist will have their values deleted as well. - self.values.merge(keys: self.keys.elements, other: other.values, defaultValue: defaultValue) + self.values.merge(keys: self.keys.elements, other: other.values, defaultValue: self.defaultValue) } public func equalState(to other: StateBasedCRDT) -> Bool { @@ -266,13 +242,8 @@ extension CRDT { return false } - switch (self.defaultValue, other.defaultValue) { - case (nil, nil): - () // continue checking - case (.some(let lhs), .some(let rhs)) where lhs.equalState(to: rhs): - () // continue checking - default: - return false // values not equal + guard self.defaultValue.equalState(to: other.defaultValue) else { + return false } guard self.values.count == other.values.count else { diff --git a/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift b/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift index 42d596a4c..09373f085 100644 --- a/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift +++ b/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift @@ -36,7 +36,7 @@ extension Serialization { case SerializerID.specializedWithTypeHint.value: return "serializerID:specialized(\(self.value))" case SerializerID.foundationJSON.value: - return "serializerID:jsonCodable(\(self.value))" + return "serializerID:foundationJSON(\(self.value))" case SerializerID.foundationPropertyListBinary.value: return "serializerID:foundationPropertyListBinary(\(self.value))" case SerializerID.foundationPropertyListXML.value: diff --git a/Sources/DistributedActors/Serialization/Serialization+Serializers+Codable.swift b/Sources/DistributedActors/Serialization/Serialization+Serializers+Codable.swift index 3ec7c5c37..aa99635fc 100644 --- a/Sources/DistributedActors/Serialization/Serialization+Serializers+Codable.swift +++ b/Sources/DistributedActors/Serialization/Serialization+Serializers+Codable.swift @@ -20,6 +20,35 @@ import Foundation // for Codable // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: JSON +/// Terrible workaround to automatically encode nils correctly on Swift 5.2, where Foundation encoders to not pass .allowFragments +internal protocol __OptionalProtocol { + // Terrible workaround to survive `null` values on "top level" (as perceived by the Coders, since we may call them for values in our message) + func __jsonEncode(using encoder: JSONEncoder) throws -> Data + // Terrible workaround to survive `null` values on "top level" (as perceived by the Coders, since we may call them for values in our message) + static func __jsonDecode(using decoder: JSONDecoder, from: Serialization.Buffer) throws -> Self +} + +extension Optional: __OptionalProtocol where Wrapped: Codable { + func __jsonEncode(using encoder: JSONEncoder) throws -> Data { + switch self { + case .some(let value): + let data = try encoder.encode(value) + return data + case .none: + return "null".data(using: .ascii)! + } + } + + static func __jsonDecode(using decoder: JSONDecoder, from bytes: Serialization.Buffer) throws -> Self { + let data = bytes.readData() + if String(data: data, encoding: .utf8) == "null" { + return nil + } else { + return try decoder.decode(Self.self, from: data) + } + } +} + /// Allows for serialization of messages using the Foundation's `JSONEncoder` and `JSONDecoder`. /// /// - Note: Take care to ensure that both "ends" (sending and receiving members of a cluster) @@ -35,13 +64,35 @@ internal class JSONCodableSerializer: Serializer { } public override func serialize(_ message: Message) throws -> Serialization.Buffer { - let data = try encoder.encode(message) + let data: Data + #if swift(>=5.3) + // It has .allowFragments set by default + data = try encoder.encode(message) + #else + // terrible hack workaround, see __OptionalProtocol for details + if let someOptional = message as? __OptionalProtocol { + data = try someOptional.__jsonEncode(using: self.encoder) + } else { + data = try self.encoder.encode(message) + } + #endif traceLog_Serialization("serialized to: \(data)") return .data(data) } public override func deserialize(from buffer: Serialization.Buffer) throws -> Message { - try self.decoder.decode(Message.self, from: buffer.readData()) + let data = buffer.readData() + #if swift(>=5.3) + // It has .allowFragments set by default + return try self.decoder.decode(Message.self, from: data) + #else + // terrible hack workaround, see __OptionalProtocol for details + if let OptionalMessageType = Message.self as? __OptionalProtocol.Type { + return try OptionalMessageType.__jsonDecode(using: self.decoder, from: buffer) as! Message + } else { + return try self.decoder.decode(Message.self, from: data) + } + #endif } public override func setSerializationContext(_ context: Serialization.Context) { diff --git a/Sources/DistributedActors/Serialization/Serialization+Serializers.swift b/Sources/DistributedActors/Serialization/Serialization+Serializers.swift index 8aeb28f78..ec5aca00c 100644 --- a/Sources/DistributedActors/Serialization/Serialization+Serializers.swift +++ b/Sources/DistributedActors/Serialization/Serialization+Serializers.swift @@ -25,6 +25,7 @@ import Foundation // for Codable /// Kind of like coder / encoder, we'll provide bridges for it // TODO: Document since users need to implement these +// TODO: could be a protocol open class Serializer { public init() {} diff --git a/Sources/DistributedActors/Serialization/Serialization.swift b/Sources/DistributedActors/Serialization/Serialization.swift index dae9ba58c..f8a02a0fe 100644 --- a/Sources/DistributedActors/Serialization/Serialization.swift +++ b/Sources/DistributedActors/Serialization/Serialization.swift @@ -102,8 +102,9 @@ public class Serialization { settings.registerSpecializedSerializer(String.self, hint: "S", serializerID: .specializedWithTypeHint) { allocator in StringSerializer(allocator) } - settings.register(String?.self, hint: "qS") - settings.register(Int?.self, hint: "qI") + + settings.register(String?.self, hint: "qS", serializerID: .foundationJSON) + settings.register(Int?.self, hint: "qI", serializerID: .foundationJSON) // ==== Declare some system messages to be handled with specialized serializers: // system messages diff --git a/Sources/DistributedActors/Serialization/SerializationPool.swift b/Sources/DistributedActors/Serialization/SerializationPool.swift index 9bbe9f50c..530908f2a 100644 --- a/Sources/DistributedActors/Serialization/SerializationPool.swift +++ b/Sources/DistributedActors/Serialization/SerializationPool.swift @@ -82,7 +82,6 @@ public final class SerializationPool { // TODO: collapse those two and only use the instrumentation points, also for metrics self.instrumentation.remoteActorMessageSerializeEnd(id: promise.futureResult, bytes: serialized.buffer.count) self.serialization.metrics.recordSerializationMessageOutbound(recipientPath, serialized.buffer.count) - traceLog_Serialization("OK serialize(\(message), to: \(recipientPath))") return serialized } catch { diff --git a/Tests/DistributedActorsTests/CRDT/CRDTGossipReplicationClusteredTests.swift b/Tests/DistributedActorsTests/CRDT/CRDTGossipReplicationClusteredTests.swift index 6334fc647..9c9279650 100644 --- a/Tests/DistributedActorsTests/CRDT/CRDTGossipReplicationClusteredTests.swift +++ b/Tests/DistributedActorsTests/CRDT/CRDTGossipReplicationClusteredTests.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Distributed Actors open source project // -// Copyright (c) 2019 Apple Inc. and the Swift Distributed Actors project authors +// Copyright (c) 2019-2020 Apple Inc. and the Swift Distributed Actors project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -16,12 +16,10 @@ import DistributedActorsTestKit import XCTest -final class CRDTGossipReplicationTests: ClusteredActorSystemsXCTestCase { +final class CRDTGossipReplicationClusteredTests: ClusteredActorSystemsXCTestCase { override func configureLogCapture(settings: inout LogCapture.Settings) { settings.excludeActorPaths = [ "/system/cluster/swim", - "/system/transport.client", - "/system/transport.server", "/system/cluster/gossip", "/system/cluster/leadership", "/system/cluster", @@ -32,6 +30,14 @@ final class CRDTGossipReplicationTests: ClusteredActorSystemsXCTestCase { override func configureActorSystem(settings: inout ActorSystemSettings) { settings.serialization.register(CRDT.ORSet.self) + settings.serialization.register(CRDT.LWWMap.self) + settings.serialization.register(CRDT.LWWRegister.self) + settings.serialization.register(CRDT.LWWMap.self) + settings.serialization.register(CRDT.LWWRegister.self) + settings.serialization.register(CRDT.LWWMap.self) + settings.serialization.register(CRDT.LWWRegister.self) + settings.serialization.register(ThrowingEncodePayload.self) + settings.serialization.register(ThrowingDecodePayload.self) } enum OwnsSetMessage: NonTransportableActorMessage { @@ -69,6 +75,27 @@ final class CRDTGossipReplicationTests: ClusteredActorSystemsXCTestCase { } } + enum OwnsMapMessage: NonTransportableActorMessage { + case set(key: String, value: Value, CRDT.OperationConsistency) + } + + func ownsLWWMap(p: ActorTestProbe>?, defaultValue: Value) -> Behavior> { + .setup { context in + let map: CRDT.ActorOwned> = CRDT.LWWMap.makeOwned(by: context, id: "lwwmap", defaultValue: defaultValue) + map.onUpdate { _, value in + p?.ref.tell(value) + } + + return .receiveMessage { + switch $0 { + case .set(let key, let value, let consistency): + _ = map.set(forKey: key, value: value, writeConsistency: consistency, timeout: .effectivelyInfinite) + } + return .same + } + } + } + // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Local only direct writes, end up on other nodes via gossip @@ -99,6 +126,33 @@ final class CRDTGossipReplicationTests: ClusteredActorSystemsXCTestCase { try self.expectSet(probe: p2, expected: ["a", "aa", "b"]) } + func test_gossip_localLWWMapUpdate_toOtherNode() throws { + let configure: (inout ActorSystemSettings) -> Void = { settings in + settings.crdt.gossipInterval = .seconds(1) + settings.crdt.gossipIntervalRandomFactor = 0 // no random factor, exactly 1second intervals + } + let first = self.setUpNode("first", configure) + let second = self.setUpNode("second", configure) + try self.joinNodes(node: first, with: second, ensureMembers: .up) + + let p1 = self.testKit(first).spawnTestProbe("probe-one", expecting: CRDT.LWWMap.self) + let p2 = self.testKit(second).spawnTestProbe("probe-two", expecting: CRDT.LWWMap.self) + + let one = try first.spawn("one", self.ownsLWWMap(p: p1, defaultValue: .none)) + let two = try second.spawn("two", self.ownsLWWMap(p: p2, defaultValue: .none)) + + one.tell(.set(key: "a", value: "foo", .local)) + one.tell(.set(key: "aa", value: .none, .local)) + + try self.expectMap(probe: p1, expected: ["a": "foo", "aa": .none]) + try self.expectMap(probe: p2, expected: ["a": "foo", "aa": .none]) + + two.tell(.set(key: "b", value: "bar", .local)) + + try self.expectMap(probe: p1, expected: ["a": "foo", "aa": .none, "b": "bar"]) + try self.expectMap(probe: p2, expected: ["a": "foo", "aa": .none, "b": "bar"]) + } + func test_gossip_readAll_gossipedOwnerAlwaysIncludesAddress() throws { let configure: (inout ActorSystemSettings) -> Void = { settings in settings.crdt.gossipInterval = .seconds(1) @@ -152,6 +206,80 @@ final class CRDTGossipReplicationTests: ClusteredActorSystemsXCTestCase { } } + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Serialization errors + + struct ThrowingEncodePayload: Equatable, Codable { + static let message = "Cannot serialize \(ThrowingEncodePayload.self)!!!" + + init() {} + + init(from decoder: Decoder) throws {} + + func encode(to encoder: Encoder) throws { + throw SerializationError.unableToSerialize(hint: Self.message) + } + } + + struct ThrowingDecodePayload: Equatable, Codable { + static let message = "Cannot deserialize \(ThrowingDecodePayload.self)!!!" + + let value: String = "foobar" + + enum CodingKeys: CodingKey { + case value + } + + init() {} + + init(from decoder: Decoder) throws { + throw SerializationError.unableToDeserialize(hint: Self.message) + } + + func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: CodingKeys.self) + try container.encode(value, forKey: .value) + } + } + + func test_gossip_failureToSerializeShouldLog_ThrowingCodablePayload() throws { + let configure: (inout ActorSystemSettings) -> Void = { settings in + settings.crdt.gossipInterval = .seconds(1) + settings.crdt.gossipIntervalRandomFactor = 0 // no random factor, exactly 1second intervals + } + let first = self.setUpNode("first", configure) + let second = self.setUpNode("second", configure) + try self.joinNodes(node: first, with: second, ensureMembers: .up) + + let one = try first.spawn("one", self.ownsLWWMap(p: nil, defaultValue: ThrowingEncodePayload())) + try second.spawn("two", self.ownsLWWMap(p: nil, defaultValue: ThrowingEncodePayload())) + + one.tell(.set(key: "a", value: ThrowingEncodePayload(), .local)) + one.tell(.set(key: "aa", value: ThrowingEncodePayload(), .local)) + + try self.capturedLogs(of: first).awaitLogContaining(self.testKit(first), text: ThrowingEncodePayload.message) + } + + func test_gossip_failureToDeserializeShouldLog_ThrowingDecodePayload() throws { + let configure: (inout ActorSystemSettings) -> Void = { settings in + settings.crdt.gossipInterval = .seconds(1) + settings.crdt.gossipIntervalRandomFactor = 0 // no random factor, exactly 1second intervals + } + let first = self.setUpNode("first", configure) + let second = self.setUpNode("second", configure) + try self.joinNodes(node: first, with: second, ensureMembers: .up) + + let p2 = self.testKit(second).spawnTestProbe("probe-two", expecting: CRDT.LWWMap.self) + + let one = try first.spawn("one", self.ownsLWWMap(p: nil, defaultValue: ThrowingDecodePayload())) + try second.spawn("two", self.ownsLWWMap(p: p2, defaultValue: ThrowingDecodePayload())) + + one.tell(.set(key: "a", value: ThrowingDecodePayload(), .local)) + one.tell(.set(key: "aa", value: ThrowingDecodePayload(), .local)) + + try self.capturedLogs(of: second).awaitLogContaining(self.testKit(second), text: ThrowingDecodePayload.message) + } + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Gossip stop conditions @@ -230,4 +358,17 @@ final class CRDTGossipReplicationTests: ClusteredActorSystemsXCTestCase { } } } + + private func expectMap(probe: ActorTestProbe>, expected: [String: String?], file: StaticString = #file, line: UInt = #line) throws { + let testKit: ActorTestKit = self._testKits.first! + + try testKit.eventually(within: .seconds(10)) { + let replicated: CRDT.LWWMap = try probe.expectMessage(within: .seconds(10), file: file, line: line) + pinfo("[\(probe.name)] received updated crdt: \(replicated)") + + guard expected == replicated.underlying else { + throw testKit.error("Expected: \(expected) but got \(replicated)", file: file, line: line) + } + } + } } diff --git a/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift b/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift index 3dd9dd8d0..84521cfdf 100644 --- a/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift +++ b/Tests/DistributedActorsTests/CRDT/Protobuf/CRDT+SerializationTests.swift @@ -214,7 +214,6 @@ final class CRDTSerializationTests: ActorSystemXCTestCase { let deserialized = try system.serialization.deserialize(as: CRDT.ORMap>.self, from: serialized) "\(deserialized.replicaID)".shouldContain("actor:sact://CRDTSerializationTests@127.0.0.1:9001/user/alpha") - deserialized.defaultValue.shouldBeNil() deserialized._keys.elements.shouldEqual(["s1", "s2"]) deserialized._storage.count.shouldEqual(2) @@ -247,7 +246,6 @@ final class CRDTSerializationTests: ActorSystemXCTestCase { let serialized = try system.serialization.serialize(map.delta!) // !-safe, must have a delta, we just checked it let deserialized = try system.serialization.deserialize(as: CRDT.ORMap>.Delta.self, from: serialized) - deserialized.defaultValue.shouldBeNil() deserialized.keys.elementByBirthDot.count.shouldEqual(map.delta!.keys.elementByBirthDot.count) deserialized.values.count.shouldEqual(2)