Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should delete_set on Update be exposed? #374

Open
evanw opened this issue Jan 2, 2024 · 5 comments
Open

Should delete_set on Update be exposed? #374

evanw opened this issue Jan 2, 2024 · 5 comments

Comments

@evanw
Copy link

evanw commented Jan 2, 2024

Context: I'm trying to implement syncing between many peers in a mesh without sending unnecessary updates.

My understanding from the docs and code is that you need to compare snapshots instead of state vectors to determine if two peers have the same contents (a snapshot being a state vector and a delete set). This is because state vectors do not change when you delete things. So I'm trying to have each peer maintain a locally-predicted snapshot for each peer it's connected to in order to send minimal updates (i.e. to avoid sending a peer data that we know it already has). That snapshot is updated both when sending data to that peer (since that peer will have that data when it gets further messages from us) and when that peer sends data to us (since we know that peer has the data it sent).

Here is some sample code that demonstrates this. It creates three peers named a, b, and c and passes some mutations between them. Toward the end, c makes a delete-only edit and sends it to a and b:

Click to expand full code sample
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
use yrs::types::ToJson;
use yrs::updates::{decoder::*, encoder::*};
use yrs::*;

fn main() {
    let (b_send_to_a, a_recv_from_b) = channel();
    let (c_send_to_a, a_recv_from_c) = channel();
    let (a_send_to_b, b_recv_from_a) = channel();
    let (c_send_to_b, b_recv_from_c) = channel();
    let (a_send_to_c, c_recv_from_a) = channel();
    let (b_send_to_c, c_recv_from_b) = channel();
    let mut handles = Vec::new();

    // Spawn peer A
    {
        let doc = Doc::new();
        doc.get_or_insert_map("foo")
            .insert(&mut doc.transact_mut(), "a", 1);
        doc.get_or_insert_map("foo")
            .insert(&mut doc.transact_mut(), "a", 2);
        doc.get_or_insert_map("foo")
            .insert(&mut doc.transact_mut(), "a", 3);
        let doc = Arc::new(Mutex::new(doc));
        handles.push(start_sync("a<->b", doc.clone(), a_send_to_b, a_recv_from_b));
        handles.push(start_sync("a<->c", doc.clone(), a_send_to_c, a_recv_from_c));
    }

    // Spawn peer B
    {
        let doc = Doc::new();
        doc.get_or_insert_map("foo")
            .insert(&mut doc.transact_mut(), "b", -1);
        doc.get_or_insert_map("foo")
            .insert(&mut doc.transact_mut(), "b", -2);
        let doc = Arc::new(Mutex::new(doc));
        handles.push(start_sync("b<->a", doc.clone(), b_send_to_a, b_recv_from_a));
        handles.push(start_sync("b<->c", doc.clone(), b_send_to_c, b_recv_from_c));
    }

    // Spawn peer C
    {
        let doc = Doc::new();
        let foo = doc.get_or_insert_map("foo");
        let doc = Arc::new(Mutex::new(doc));
        handles.push(start_sync("c<->a", doc.clone(), c_send_to_a, c_recv_from_a));
        handles.push(start_sync("c<->b", doc.clone(), c_send_to_b, c_recv_from_b));

        // Test delete-only edits
        thread::sleep(Duration::from_millis(500));
        let doc = doc.lock().unwrap();
        let mut txn = doc.transact_mut();
        foo.remove(&mut txn, "a");
        foo.remove(&mut txn, "b");
        println!("[c] making a delete-only edit");
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

fn start_sync(
    id: &'static str,
    doc: Arc<Mutex<Doc>>,
    sender: Sender<Vec<u8>>,
    receiver: Receiver<Vec<u8>>,
) -> thread::JoinHandle<()> {
    return thread::spawn(move || {
        sender
            .send(doc.lock().unwrap().transact().snapshot().encode_v2())
            .unwrap();
        let their_snapshot = Snapshot::decode_v2(&receiver.recv().unwrap()).unwrap();
        let their_snapshot = Arc::new(Mutex::new(their_snapshot));
        let dirty_flag = Arc::new((Mutex::new(false), Condvar::new()));

        let read = thread::spawn({
            let doc = Arc::clone(&doc);
            let their_snapshot = Arc::clone(&their_snapshot);
            move || sync_read(id, doc, their_snapshot, receiver)
        });

        let write = thread::spawn({
            let doc = Arc::clone(&doc);
            let their_snapshot = Arc::clone(&their_snapshot);
            let dirty_flag = dirty_flag.clone();
            move || sync_write(id, doc, their_snapshot, sender, dirty_flag)
        });

        let observe = thread::spawn({
            let doc = Arc::clone(&doc);
            let dirty_flag = dirty_flag.clone();
            move || {
                let subscription = doc.lock().unwrap().observe_after_transaction(move |_| {
                    let (flag, dirty) = &*dirty_flag;
                    *flag.lock().unwrap() = true;
                    dirty.notify_one();
                });
                write.join().unwrap();

                // Work around a memory safety bug in yrs:
                // https://github.com/y-crdt/y-crdt/issues/373
                let doc = doc.lock().unwrap();
                drop(subscription);
                drop(doc);
            }
        });

        thread::sleep(Duration::from_millis(1_000));
        let doc = doc.lock().unwrap();
        let txn = doc.transact();
        println!(
            "[{id}] final: doc={:?} snapshot={:?}",
            doc.to_json(&txn),
            txn.snapshot(),
        );
        drop(txn);
        drop(doc);

        read.join().unwrap();
        observe.join().unwrap();
    });
}

fn sync_read(
    id: &str,
    doc: Arc<Mutex<Doc>>,
    their_snapshot: Arc<Mutex<Snapshot>>,
    receiver: Receiver<Vec<u8>>,
) {
    loop {
        let update = Update::decode_v2(&receiver.recv().unwrap()).unwrap();
        let doc = doc.lock().unwrap();

        // Update our local prediction of their snapshot with the data they will now have
        let mut their_snapshot = their_snapshot.lock().unwrap();
        their_snapshot.delete_set.merge(update.delete_set.clone());
        their_snapshot.delete_set.squash();
        their_snapshot.state_map.merge(update.state_vector());

        // Send the missing data to them
        let mut txn = doc.transact_mut();
        txn.apply_update(update);
        println!(
            "[{id}] recv: doc={:?} snapshot={:?}",
            doc.to_json(&txn),
            txn.snapshot(),
        );
    }
}

fn sync_write(
    id: &str,
    doc: Arc<Mutex<Doc>>,
    their_snapshot: Arc<Mutex<Snapshot>>,
    sender: Sender<Vec<u8>>,
    dirty_flag: Arc<(Mutex<bool>, Condvar)>,
) {
    loop {
        let doc = doc.lock().unwrap();
        let txn = doc.transact();
        let our_snapshot = txn.snapshot();
        let mut their_snapshot = their_snapshot.lock().unwrap();
        let mut their_next_snapshot = their_snapshot.clone();

        // See what happens to their snapshot if we apply our update
        their_next_snapshot
            .delete_set
            .merge(our_snapshot.delete_set.clone());
        their_next_snapshot.delete_set.squash();
        their_next_snapshot.state_map.merge(our_snapshot.state_map);

        // If their snapshot will change, then they need this update
        if their_next_snapshot != *their_snapshot {
            let update = txn.encode_state_as_update_v2(&their_snapshot.state_map);
            println!(
                "[{id}] send: doc={:?} snapshot={:?}",
                doc.to_json(&txn),
                txn.snapshot(),
            );
            drop(txn);
            *their_snapshot = their_next_snapshot;
            sender.send(update).unwrap();
        } else {
            // Otherwise they don't need to be updated at all
            drop(txn);
        }
        drop(their_snapshot);
        drop(doc);

        // Wait until after the next transaction
        let (flag, dirty) = &*dirty_flag;
        let mut flag = flag.lock().unwrap();
        while !*flag {
            flag = dirty.wait(flag).unwrap();
        }
        *flag = false;
    }
}

The problem I'm having is that delete_set on Update is private, and doesn't have a getter. If I don't have that information, then a and b both redundantly send c a copy of its own edit back for 6 update messages total (counting after c makes its edit). But if I do have that information, a and b can avoid sending the edit back to c for 4 messages total. Specifically c → a, c → b, a → b, and b → a but not a → c or b → c.

Is there a reason that delete_set on Update isn't exposed? For example, maybe I'm misunderstanding delete sets and this information isn't actually useful for what I'm trying to use it for? Or if it would be useful for this purpose, can it be exposed in the API?

@dmonad
Copy link
Contributor

dmonad commented Jan 2, 2024

The deleteset is pretty small. So I suggest exchanging it with all connected peers. My reasoning: Comparing snapshots (which contain a deleteset) is less efficient than exchanging state vectors and sending the full deleteset (even if the peers are already fully synced).

One optimization would be to hash the snapshot to determine if a sync is necessary. If the hashed snapshot is the same for two peers, they don't need to do a full sync. This works in Yjs because the encoded snapshot is deterministically encoded. I'm not sure if that already works in y-crdt.

@evanw
Copy link
Author

evanw commented Jan 2, 2024

The deleteset is pretty small. So I suggest exchanging it with all connected peers. My reasoning: Comparing snapshots (which contain a deleteset) is less efficient than exchanging state vectors and sending the full deleteset (even if the peers are already fully synced).

Thanks for your reply. Exchanging the delete set with all connected peers is exactly what I'm trying to do. I sort of see your point about sending snapshots being less efficient than state vectors plus updates. If you only think of a single sync, then snapshot + maybe update means maybe sending the delete set twice while state vector + always update means always sending the delete set once (although if you don't need an update then a snapshot could be considered more efficient because there is less round-trip time to determine that you're fully synced).

But if you think about an ongoing stream of updates, each update already includes a state vector and a delete set. It seems wasteful to either a) send another copy of the delete set along with each update (because the delete set is currently a private field) so that the other peer can determine when additional updates are necessary or b) always send out updates to peers even when we could have determined that the update was unnecessary.

Instead, it seems like it would be more efficient to be able to reuse the existing delete set that incoming updates contain to update the locally-predicted delete set for the remote peer. I'm assuming it's possible to keep the full delete set updated by starting with the full initial value and then merging in the delete sets of all incoming updates. If that's not possible, then that would be good to know as well.

Here's what I'm trying to achieve in rough pseudocode:

fn sync(doc, socket, events) {
    // Exchange snapshots (i.e. state vectors + delete sets)
    socket.send(doc.snapshot)
    let their_snapshot = socket.recv()

    let next_event = AfterTransaction
    loop {
        match next_event {
            RecvUpdate(update) => {
                let snapshot_update = Snapshot {
                    delete_set: update.delete_set, // This is what I was trying to access
                    state_vector: update.state_vector
                }
                their_snapshot.merge_with(snapshot_update)
                doc.apply_update(update)
            }

            AfterTransaction => {
                let their_next_snapshot = their_snapshot.clone()
                their_next_snapshot.merge_with(doc.snapshot)

                // Only send them an update if they need one
                if their_next_snapshot != their_snapshot {
                    let update = doc.encode_state_as_update(their_snapshot.state_vector)
                    socket.send(update)
                    their_snapshot = their_next_snapshot
                }
            }
        }
        next_event = events.next()
    }
}

My problem is that update.delete_set is currently inaccessible. I could also explicitly exchange the delete set with each connected peer as you suggest (again in rough pseudocode):

 fn sync(doc, socket, events) {
     // Exchange snapshots (i.e. state vectors + delete sets)
     socket.send(doc.snapshot)
     let their_snapshot = socket.recv()
 
     let next_event = AfterTransaction
     loop {
         match next_event {
-            RecvUpdate(update) => {
+            RecvUpdate(update, delete_set) => {
                 let snapshot_update = Snapshot {
-                    delete_set: update.delete_set, // This is what I was trying to access
+                    delete_set, // This currently must be sent as an additional copy
                     state_vector: update.state_vector
                 }
                 their_snapshot.merge_with(snapshot_update)
                 doc.apply_update(update)
             }
 
             AfterTransaction => {
                 let their_next_snapshot = their_snapshot.clone()
                 their_next_snapshot.merge_with(doc.snapshot)
 
                 // Only send them an update if they need one
                 if their_next_snapshot != their_snapshot {
                     let update = doc.encode_state_as_update(their_snapshot.state_vector)
-                    socket.send(update)
+                    socket.send(update, doc.snapshot.delete_set)
                     their_snapshot = their_next_snapshot
                 }
             }
         }
         next_event = events.next()
     }
 }

But that seems wasteful since it seems like sending another copy of the delete set is redundant.

Sending a hash of the snapshot as you suggest would also work as all CRDTs converge eventually, so eventually the hashes will be equal and update propagation would stop. But it seems like using locally-predicted snapshots for remote peers instead of a snapshot hash to determine dirty state would be more precise, and therefore more desirable. The difference is that a hash equality comparison doesn't tell you when one side's data is a subset of the other, and therefore no update is needed, while a snapshot comparison can tell you that (from my understanding at least).

@Horusiath
Copy link
Collaborator

The standard way on how network protocol works is:

  1. Peer A initializes connection and sends a state vector alone to peer B.
  2. Peer B computes changes based on state vector and sends a summary with updates older than state vector together with whole delete set.
  3. On subsequent updates peers are sending updates which contain only changes data, including only subsets on delete set.

Any redundancy here comes from pt.2, where we initially send entire delete set, but as @dmonad mentioned it happens once per connection and it's compressed pretty well. Even for hundreds of thousands on deleted elements the size of a payload shouldn't take more than a few kB in the worst case.

Sending snapshot instead of state vector overall doesn't necessarily bring much of an improvements, as we're still sending delete set around. It's just that this time it's being send as part of pt.1 instead of pt.2.

@evanw maybe you could share some numbers that cause problems in your use case? Does your use case have some unique constrains that have to be met?

@evanw
Copy link
Author

evanw commented Jan 10, 2024

@evanw maybe you could share some numbers that cause problems in your use case? Does your use case have some unique constrains that have to be met?

That's what I meant by this:

If I don't have that information, then a and b both redundantly send c a copy of its own edit back for 6 update messages total (counting after c makes its edit). But if I do have that information, a and b can avoid sending the edit back to c for 4 messages total. Specifically c → a, c → b, a → b, and b → a but not a → c or b → c.

I believe having this information means peers can send fewer updates because they have a more accurate model of the data that other peers already have. In a peer-to-peer scenario (not a client-to-server scenario), each peer may have a different subset of the data as updates propagate around. It's more efficient when you can know if a given remote peer's data is a superset of yours (i.e. if merging your snapshot into theirs doesn't change it) even if the data is not exactly equal to yours (i.e. if the snapshot hashes are not equal).

That said, I don't have a benchmark with production numbers for this at the moment. It seems like unnecessary updates will be propagated without access to this information so it seems like there is an inefficiency here, but I don't have real numbers to back it up. I can close this issue if it's unwelcome. No worries.

On subsequent updates peers are sending updates which contain only changes data, including only subsets on delete set.

It's this subset of the delete set here that I'm interested in. I'd like to use it to update each peer's model of the delete sets of remote peers over time as new updates come in. Except I can't access it because it's currently private.

@dmonad
Copy link
Contributor

dmonad commented Jan 10, 2024

It's more efficient when you can know if a given remote peer's data is a superset of yours (i.e. if merging your snapshot into theirs doesn't change it) even if the data is not exactly equal to yours (i.e. if the snapshot hashes are not equal).

Interesting point. I can imagine that it might be interesting to experiment with this.

I'm not sure if @Horusiath wants to expose the deleteset, because changing the interface in the future would be a breaking change.

Just food for thoughts:

In a totally connected network of three peers, the optimal approach only sends four messages at most. This should already be possible if you are distributing messages using the update event. I know that this is possible with Yjs & y-webrtc.

The update event shouldn't fire if the client already has the update. The messages a → b, and b → a won't have any effect on the a/b clients because they already received the update. Therefore, the clients won't generate new updates. So the clients won't send a → c or b → c.

Just to reiterate. The full deleteset is only sent on the initial sync. Every subsequent change only contains differences, never the full deleteset. A reliable protocol that works for p2p networks (even partially connected networks) and also star topologies is described here: https://github.com/yjs/y-protocols/blob/master/PROTOCOL.md#sync-protocol-v1-encoding
The idea is that connected peers sync using sync step 1 & sync step 2. y-webrtc limits the amount of exchanged messages by building a partially connected network. Still, all messages will be sent at most 2*C-Cp times (C=number of connections,Cp=number of connections of the author of the message) which is not perfect. But it works really well even for larger networks if you reduce the number of connections per peer, which is necessary either way.

In any case, a protocol for managing a large p2p network is necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants