Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion KANBAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Ship a working single-node sync engine. One OCaml server, SQLite persistence, Ty

## Core CRDT (OCaml)

- [ ] **CORE-1** Op envelope type + per-zone lamport infrastructure + UUID v7 generation
- [x] **CORE-1** Op envelope type + per-zone lamport infrastructure + UUID v7 generation
- [ ] **CORE-2** Map primitive (with slot safety: `initOnce`, `live`, `replace`, orphan event)
- [ ] **CORE-3** List primitive
- [ ] **CORE-4** Text primitive (codepoint identity, char_id, UTF-8 in/out)
Expand Down
3 changes: 3 additions & 0 deletions core/lib/auth/crdtsync_auth.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@
Lands per SERVER-5, SERVER-6, WIRE-4 (see KANBAN.md). Design: see ARCHITECTURE.md, sections
"Authentication", "Authorization". *)

(* Wrapper module for the [crdtsync_auth] library. Submodules from sibling files must be
re-exported here to be reachable as [Crdtsync_auth.<Submodule>] from outside. *)

let version = "0.0.0"
3 changes: 3 additions & 0 deletions core/lib/blob/crdtsync_blob.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@
Lands per BLOB-1 through BLOB-4 (see KANBAN.md). Design: see ARCHITECTURE.md, section "Binary
Blobs". *)

(* Wrapper module for the [crdtsync_blob] library. Submodules from sibling files must be
re-exported here to be reachable as [Crdtsync_blob.<Submodule>] from outside. *)

let version = "0.0.0"
12 changes: 12 additions & 0 deletions core/lib/crdt/crdtsync_crdt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,16 @@
Implementations land in CORE-2 through CORE-6 (see KANBAN.md). Design: see ARCHITECTURE.md,
sections "CRDT Model", "Map Slot Safety", "Anchors and Element IDs", "Text and Unicode". *)

(* Wrapper module for the [crdtsync_crdt] library. Because a file matching the library name
exists, dune does NOT auto-export sibling modules — they must be re-exported here to be
reachable as [Crdtsync_crdt.<Submodule>] from outside. Add new lines here as submodules
land. *)

module Uuid_v7 = Uuid_v7
module Op_id = Op_id
module Lamport = Lamport
module Wall_time = Wall_time
module Op = Op
module Envelope = Envelope

let version = "0.0.0"
2 changes: 1 addition & 1 deletion core/lib/crdt/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name crdtsync_crdt)
(public_name crdtsync_core.crdt)
(libraries logs))
(libraries logs uuidm unix))
62 changes: 62 additions & 0 deletions core/lib/crdt/envelope.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
(** Op envelope. *)

type tx_role = Member | Commit

let pp_tx_role (fmt : Format.formatter) (r : tx_role) : unit =
Format.pp_print_string fmt (match r with Member -> "member" | Commit -> "commit")

let equal_tx_role (a : tx_role) (b : tx_role) : bool =
match (a, b) with Member, Member | Commit, Commit -> true | _ -> false

type t = {
op_id : Op_id.t;
actor_id : string;
room : string;
branch : string;
zone : string;
schema_version : int;
lamport : Lamport.t;
wall_time : Wall_time.t;
op : Op.kind;
tx : (Uuid_v7.t * tx_role) option;
}

let make ~op_id ~actor_id ~room ~branch ~zone ~schema_version ~lamport ~wall_time ~op ?tx () : t =
{ op_id; actor_id; room; branch; zone; schema_version; lamport; wall_time; op; tx }

let equal_tx (a : (Uuid_v7.t * tx_role) option) (b : (Uuid_v7.t * tx_role) option) : bool =
Option.equal
(fun (id_a, role_a) (id_b, role_b) ->
Uuid_v7.compare id_a id_b = 0 && equal_tx_role role_a role_b)
a b

let equal (a : t) (b : t) : bool =
Op_id.equal a.op_id b.op_id
&& String.equal a.actor_id b.actor_id
&& String.equal a.room b.room && String.equal a.branch b.branch && String.equal a.zone b.zone
&& Int.equal a.schema_version b.schema_version
&& Lamport.equal a.lamport b.lamport
&& Wall_time.equal a.wall_time b.wall_time
&& Op.equal_kind a.op b.op && equal_tx a.tx b.tx

let pp_tx (fmt : Format.formatter) (tx : (Uuid_v7.t * tx_role) option) : unit =
match tx with
| None -> Format.pp_print_string fmt "None"
| Some (id, role) -> Format.fprintf fmt "Some(%a, %a)" Uuid_v7.pp id pp_tx_role role

let pp (fmt : Format.formatter) (e : t) : unit =
Format.fprintf fmt
"@[<v 2>Envelope {@,\
op_id = %a;@,\
actor_id = %S;@,\
room = %S;@,\
branch = %S;@,\
zone = %S;@,\
schema_version = %d;@,\
lamport = %a;@,\
wall_time = %a;@,\
op = %a;@,\
tx = %a;@]@,\
}"
Op_id.pp e.op_id e.actor_id e.room e.branch e.zone e.schema_version Lamport.pp e.lamport
Wall_time.pp e.wall_time Op.pp_kind e.op pp_tx e.tx
48 changes: 48 additions & 0 deletions core/lib/crdt/envelope.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
(** Op envelope.

Immutable record carried with every operation. Mirrors the wire shape described in
ARCHITECTURE.md, section "Internal Data Model".

Transaction membership: in the wire format, [tx_id] and [tx_role] appear iff together. Here they
are folded into a single optional pair to make the invalid state unrepresentable. *)

type tx_role = Member (** part of an open tx *) | Commit (** commit marker; closes the tx *)

val pp_tx_role : Format.formatter -> tx_role -> unit
val equal_tx_role : tx_role -> tx_role -> bool

type t = private {
op_id : Op_id.t;
actor_id : string;
room : string;
branch : string;
zone : string;
schema_version : int;
lamport : Lamport.t;
wall_time : Wall_time.t;
op : Op.kind;
tx : (Uuid_v7.t * tx_role) option;
(** [Some (tx_id, role)] iff member of a tx; [tx_id] is client-generated UUID v7 *)
}
(** Read-only record. Construct via {!make}. *)

val make :
op_id:Op_id.t ->
actor_id:string ->
room:string ->
branch:string ->
zone:string ->
schema_version:int ->
lamport:Lamport.t ->
wall_time:Wall_time.t ->
op:Op.kind ->
?tx:Uuid_v7.t * tx_role ->
unit ->
t
(** Construct an envelope. [?tx] omitted = standalone op (no transaction). *)

val equal : t -> t -> bool
(** Structural equality across all fields. *)

val pp : Format.formatter -> t -> unit
(** Multi-line human-readable dump. Format is debug-only, not stable. *)
14 changes: 14 additions & 0 deletions core/lib/crdt/lamport.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
(** Lamport clock. *)

type t = int64

let zero : t = 0L
let tick (l : t) : t = Int64.succ l
let merge ~recv:r ~local:l : t = Int64.max r l |> Int64.succ
let compare = Int64.compare
let equal = Int64.equal
let pp (fmt : Format.formatter) (l : t) : unit = Format.fprintf fmt "%Ld" l
let to_int64 (l : t) : int64 = l

let of_int64 (i : int64) : t =
if i < 0L then failwith "Lamport.of_int64: negative value not allowed" else i
36 changes: 36 additions & 0 deletions core/lib/crdt/lamport.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
(** Lamport clock.

Per-zone logical clock. The module exposes a single-clock primitive; callers own one [Lamport.t]
per zone. Internally a nonnegative [int64] (values [0L .. Int64.max_int]); at 1 tick/ns that
bound is ~292 years, so the wrap case is not a practical concern. See ARCHITECTURE.md, section
"Algorithms and Invariants". *)

type t
(** Abstract, nonnegative. Construct via {!zero} / {!of_int64}; advance via {!tick} / {!merge};
project for the wire via {!to_int64}. *)

val zero : t
(** Initial clock value. *)

val tick : t -> t
(** [tick c] returns the next clock value for a local event ([c + 1]). *)

val merge : recv:t -> local:t -> t
(** [merge ~recv ~local] is the post-receive clock: [max(recv, local) + 1]. Apply on every observed
remote op. *)

val compare : t -> t -> int
(** Total order on clock values. Stable, suitable for [Map.Make] / [Set.Make] keys. *)

val equal : t -> t -> bool
(** [equal a b] iff [compare a b = 0]. *)

val pp : Format.formatter -> t -> unit
(** Pretty-print as a decimal integer. *)

val to_int64 : t -> int64
(** Wire projection. Returns the underlying nonnegative [int64]. *)

val of_int64 : int64 -> t
(** Wire injection. Raises [Failure] if [i] is negative. Callers (e.g. wire codec) are expected to
validate / surface malformed input upstream. *)
8 changes: 8 additions & 0 deletions core/lib/crdt/op.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(** Op kind variant. Placeholder for CORE-1; CORE-8 fills the closed enum. *)

type kind = Placeholder

let pp_kind (fmt : Format.formatter) (k : kind) : unit =
match k with Placeholder -> Format.fprintf fmt "Placeholder"

let equal_kind (a : kind) (b : kind) : bool = match (a, b) with Placeholder, Placeholder -> true
11 changes: 11 additions & 0 deletions core/lib/crdt/op.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
(** Op kind variant.

Placeholder for v0.1; the closed enum of CRDT ops (text.insert, map.set, xml.setAttr, acl.grant,
migrate, ...) lands in CORE-8. Each constructor carries its own target + payload — there is no
separate [target] / [payload] field on the envelope. See ARCHITECTURE.md, sections "Internal
Data Model" and "Supported Operations". *)

type kind = Placeholder (** v0.1 placeholder. CORE-8 replaces with the full closed enum. *)

val pp_kind : Format.formatter -> kind -> unit
val equal_kind : kind -> kind -> bool
16 changes: 16 additions & 0 deletions core/lib/crdt/op_id.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
(** Op identity. *)

type t = { client_id : Uuid_v7.t; client_seq : int64 }

let make ~client_id:cid ~client_seq:cseq : t = { client_id = cid; client_seq = cseq }
let client_id (opid : t) : Uuid_v7.t = opid.client_id
let client_seq (opid : t) : int64 = opid.client_seq

let compare (a : t) (b : t) : int =
let c = Uuid_v7.compare a.client_id b.client_id in
if c <> 0 then c else Int64.compare a.client_seq b.client_seq

let equal (a : t) (b : t) : bool = a.client_id = b.client_id && a.client_seq = b.client_seq

let pp (fmt : Format.formatter) (opid : t) : unit =
Format.fprintf fmt "%a:%Ld" Uuid_v7.pp opid.client_id opid.client_seq
27 changes: 27 additions & 0 deletions core/lib/crdt/op_id.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
(** Op identity: [(client_id, client_seq)].

Globally-unique identifier for every operation in the system. Used for idempotency (server
ignores already-seen op_ids), undo-stack indexing, audit, and reconnect resume. See
ARCHITECTURE.md, sections "Internal Data Model" and "Idempotency". *)

type t
(** Abstract. Construct via {!make}; project fields via {!client_id} / {!client_seq}. *)

val make : client_id:Uuid_v7.t -> client_seq:int64 -> t
(** [make ~client_id ~client_seq] constructs an op_id.

[client_seq] is the monotonic per-client sequence number; callers are responsible for
monotonicity (no enforcement here). *)

val client_id : t -> Uuid_v7.t
val client_seq : t -> int64

val compare : t -> t -> int
(** Total order on op_ids. Compares [client_id] first, then [client_seq] on tie. Stable, suitable
for [Map.Make] / [Set.Make] keys. *)

val equal : t -> t -> bool
(** [equal a b] iff [compare a b = 0]. *)

val pp : Format.formatter -> t -> unit
(** Pretty-print as ["<client_id>:<client_seq>"]. *)
35 changes: 35 additions & 0 deletions core/lib/crdt/uuid_v7.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
(** UUID v7 wrapper over Uuidm *)

type t = Uuidm.t

let gen =
Uuidm.v7_monotonic_gen
~now_ms:(fun () -> Int64.of_float (Unix.gettimeofday () *. 1000.))
(Random.State.make_self_init ())

let rec v () : t =
match gen () with
| Some uuid -> uuid
| None ->
(* There is a limit of 2^12 UUIDs per millisecond *)
(* If we hit that limit, we can just wait a bit and try again *)
Unix.sleepf 0.001;
v ()

let to_bytes (id : t) : bytes = Uuidm.to_binary_string id |> Bytes.of_string

let of_bytes (bs : bytes) : t option =
if Bytes.length bs <> 16 then None
else
let s = Bytes.to_string bs in
match Uuidm.of_binary_string s with
| Some uuid when Uuidm.version uuid = 7 -> Some uuid
| _ -> None

let to_string (id : t) : string = Uuidm.to_string id

let of_string (str : string) : t option =
match Uuidm.of_string str with Some uuid when Uuidm.version uuid = 7 -> Some uuid | _ -> None

let compare = Uuidm.compare
let pp = Uuidm.pp
32 changes: 32 additions & 0 deletions core/lib/crdt/uuid_v7.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
(** UUID version 7 (RFC 9562 §5.7) — time-ordered, 128 bits.

Thin wrapper over {!Uuidm} that pins the variant to v7. All [client_id] values in the crdtsync
wire envelope are values of this type. *)

type t
(** A v7 UUID. Abstract; round-trip via {!to_bytes} / {!of_bytes} or {!to_string} / {!of_string}. *)

val v : unit -> t
(** Generate a fresh v7 UUID. Embeds the current millisecond timestamp + random bits per RFC 9562.
*)

val to_bytes : t -> bytes
(** 16-byte binary representation. *)

val of_bytes : bytes -> t option
(** Parse a 16-byte binary representation. Returns [None] if the input is not 16 bytes or the
version nibble is not [0x7]. *)

val to_string : t -> string
(** Standard 36-character hex form (e.g. ["018f2c5b-d6f3-7000-89ab-...]"). *)

val of_string : string -> t option
(** Parse the 36-character hex form. Returns [None] on malformed input or when the version nibble is
not [0x7]. *)

val compare : t -> t -> int
(** Total order. For v7, this is also a time-ordering of generation (timestamps tiebroken by random
bits). *)

val pp : Format.formatter -> t -> unit
(** Pretty-print as the standard 36-character hex form. *)
13 changes: 13 additions & 0 deletions core/lib/crdt/wall_time.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(** Wall-clock time. *)

type t = int64

let now () : t = Int64.of_float (Unix.gettimeofday () *. 1000.0)

let of_ms (i : int64) : t =
if i < 0L then failwith "Wall_time.of_ms: negative value not allowed" else i

let to_ms (t : t) : int64 = t
let compare = Int64.compare
let equal = Int64.equal
let pp (fmt : Format.formatter) (t : t) : unit = Format.fprintf fmt "%Ld" t
24 changes: 24 additions & 0 deletions core/lib/crdt/wall_time.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(** Wall-clock time.

Real-world timestamp captured at op authoring. Informational only — not used for causality (see
{!Lamport} for that). Powers debug / audit / display / analytics. See ARCHITECTURE.md, section
"Internal Data Model". *)

type t
(** Abstract, nonnegative [int64] milliseconds since the Unix epoch (1970-01-01 UTC). Pre-epoch
timestamps not supported. *)

val now : unit -> t
(** Current wall-clock time. *)

val of_ms : int64 -> t
(** Wire injection. Raises [Failure] if [ms] is negative. *)

val to_ms : t -> int64
(** Wire projection. Returns ms since Unix epoch. *)

val compare : t -> t -> int
val equal : t -> t -> bool

val pp : Format.formatter -> t -> unit
(** Pretty-print as a decimal millisecond count. *)
3 changes: 3 additions & 0 deletions core/lib/persist/crdtsync_persist.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@
Lands per PERSIST-1 through PERSIST-5 (see KANBAN.md). Design: see ARCHITECTURE.md, sections
"Persistence Architecture", "Snapshots". *)

(* Wrapper module for the [crdtsync_persist] library. Submodules from sibling files must be
re-exported here to be reachable as [Crdtsync_persist.<Submodule>] from outside. *)

let version = "0.0.0"
Loading
Loading