Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions Protos/CRDT/CRDT.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,36 @@ message CRDTORSet {
CRDTVersionedContainer state = 2;
}

// ***** CRDT.ORMap *****

message CRDTORMapKey {
Manifest manifest = 1;
bytes payload = 2;
}

message CRDTORMapValue {
Manifest manifest = 1;
bytes payload = 2;
}

message CRDTORMapKeyValue {
CRDTORMapKey key = 1;
CRDTORMapValue value = 2;
}

message CRDTORMap {
message Delta {
CRDTVersionedContainerDelta keys = 1;
repeated CRDTORMapKeyValue values = 2;
}

VersionReplicaID replicaID = 1;
CRDTORSet keys = 2;
repeated CRDTORMapKeyValue values = 3;
// Delta is derived from `updatedValues`
repeated CRDTORMapKeyValue updatedValues = 4;
}

// ***** CRDT.LWWRegister *****

message CRDTLWWRegister {
Expand Down
107 changes: 107 additions & 0 deletions Sources/DistributedActors/CRDT/Protobuf/CRDT+Serialization.swift
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,113 @@ extension CRDT.ORSet: ProtobufRepresentable {
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: CRDT.ORMap

private enum ORMapSerializationUtils {
static func keyToProto<Key: Codable & Hashable>(_ key: Key, context: Serialization.Context) throws -> ProtoCRDTORMapKey {
let serialized = try context.serialization.serialize(key)
var proto = ProtoCRDTORMapKey()
proto.manifest = try serialized.manifest.toProto(context: context)
proto.payload = serialized.buffer.readData()
return proto
}

static func keyFromProto<Key: Codable & Hashable>(_ proto: ProtoCRDTORMapKey, context: Serialization.Context) throws -> Key {
try context.serialization.deserialize(
as: Key.self,
from: .data(proto.payload),
using: Serialization.Manifest(fromProto: proto.manifest, context: context)
)
}

static func valueToProto<Value: CvRDT>(_ value: Value, context: Serialization.Context) throws -> ProtoCRDTORMapValue {
let serialized = try context.serialization.serialize(value)
var proto = ProtoCRDTORMapValue()
proto.manifest = try serialized.manifest.toProto(context: context)
proto.payload = serialized.buffer.readData()
return proto
}

static func valueFromProto<Value: CvRDT>(_ proto: ProtoCRDTORMapValue, context: Serialization.Context) throws -> Value {
try context.serialization.deserialize(
as: Value.self,
from: .data(proto.payload),
using: Serialization.Manifest(fromProto: proto.manifest, context: context)
)
}

static func valuesToProto<Key: Codable & Hashable, Value: CvRDT>(_ values: [Key: Value], context: Serialization.Context) throws -> [ProtoCRDTORMapKeyValue] {
try values.map { key, value in
var proto = ProtoCRDTORMapKeyValue()
proto.key = try keyToProto(key, context: context)
proto.value = try valueToProto(value, context: context)
return proto
}
}

static func valuesFromProto<Key: Codable & Hashable, Value: CvRDT>(_ proto: [ProtoCRDTORMapKeyValue], context: Serialization.Context) throws -> [Key: Value] {
try proto.reduce(into: [Key: Value]()) { result, protoKeyValue in
guard protoKeyValue.hasKey else {
throw SerializationError.missingField("key", type: String(describing: [Key: Value].self))
}
let key: Key = try keyFromProto(protoKeyValue.key, context: context)
let value: Value = try valueFromProto(protoKeyValue.value, context: context)
result[key] = value
}
}
}

extension CRDT.ORMap: ProtobufRepresentable {
public typealias ProtobufRepresentation = ProtoCRDTORMap

public func toProto(context: Serialization.Context) throws -> ProtobufRepresentation {
var proto = ProtobufRepresentation()
proto.replicaID = try self.replicaID.toProto(context: context)
proto.keys = try self._keys.toProto(context: context)
proto.values = try ORMapSerializationUtils.valuesToProto(self._values, context: context)
proto.updatedValues = try ORMapSerializationUtils.valuesToProto(self.updatedValues, context: context)
return proto
}

public init(fromProto proto: ProtoCRDTORMap, context: Serialization.Context) throws {
guard proto.hasReplicaID else {
throw SerializationError.missingField("replicaID", type: String(describing: CRDT.ORMap<Key, Value>.self))
}
self.replicaID = try ReplicaID(fromProto: proto.replicaID, context: context)

guard proto.hasKeys else {
throw SerializationError.missingField("keys", type: String(describing: CRDT.ORMap<Key, Value>.self))
}
self._keys = try CRDT.ORSet<Key>(fromProto: proto.keys, context: context)

self._values = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context)
self.updatedValues = try ORMapSerializationUtils.valuesFromProto(proto.updatedValues, context: context)
self.defaultValue = nil // We don't need remote's default value for merge
}
}

extension CRDT.ORMapDelta: ProtobufRepresentable {
public typealias ProtobufRepresentation = ProtoCRDTORMap.Delta

public func toProto(context: Serialization.Context) throws -> ProtobufRepresentation {
var proto = ProtobufRepresentation()
proto.keys = try self.keys.toProto(context: context)
proto.values = try ORMapSerializationUtils.valuesToProto(self.values, context: context)
return proto
}

public init(fromProto proto: ProtobufRepresentation, context: Serialization.Context) throws {
guard proto.hasKeys else {
throw SerializationError.missingField("keys", type: String(describing: CRDT.ORMapDelta<Key, Value>.self))
}
self.keys = try CRDT.ORSet<Key>.Delta(fromProto: proto.keys, context: context)

self.values = try ORMapSerializationUtils.valuesFromProto(proto.values, context: context)
self.defaultValue = nil // We don't need remote's default value for merge
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: CRDT.LWWRegister

Expand Down
Loading