Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory safety issues (heap corruption and segfaults) #373

Open
evanw opened this issue Jan 2, 2024 · 2 comments
Open

Memory safety issues (heap corruption and segfaults) #373

evanw opened this issue Jan 2, 2024 · 2 comments

Comments

@evanw
Copy link

evanw commented Jan 2, 2024

Context: I'm trying to implement syncing between many peers in a mesh without sending unnecessary updates. That involves each peer locally predicting the state of each remote peer it's connected to.

I was going to ask a question about how to accomplish this by using delete sets from updates, but as I was writing some proof-of-concept code I ran into what seems to be a memory safety issue with this library. Here's my sample code that reproduces the memory safety issue:

Click to expand the sample code
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use std::time::Duration;
use yrs::types::ToJson;
use yrs::updates::{decoder::*, encoder::*};
use yrs::*;

fn main() {
    let (b_send_to_a, a_recv_from_b) = channel();
    let (c_send_to_a, a_recv_from_c) = channel();
    let (a_send_to_b, b_recv_from_a) = channel();
    let (c_send_to_b, b_recv_from_c) = channel();
    let (a_send_to_c, c_recv_from_a) = channel();
    let (b_send_to_c, c_recv_from_b) = channel();
    let mut handles = Vec::new();

    // Spawn peer A
    {
        let doc = Doc::new();
        doc.get_or_insert_map("foo")
            .insert(&mut doc.transact_mut(), "a", 1);
        doc.get_or_insert_map("foo")
            .insert(&mut doc.transact_mut(), "a", 2);
        doc.get_or_insert_map("foo")
            .insert(&mut doc.transact_mut(), "a", 3);
        let doc = Arc::new(Mutex::new(doc));
        handles.push(start_sync("a<->b", doc.clone(), a_send_to_b, a_recv_from_b));
        handles.push(start_sync("a<->c", doc.clone(), a_send_to_c, a_recv_from_c));
    }

    // Spawn peer B
    {
        let doc = Doc::new();
        doc.get_or_insert_map("foo")
            .insert(&mut doc.transact_mut(), "b", -1);
        doc.get_or_insert_map("foo")
            .insert(&mut doc.transact_mut(), "b", -2);
        let doc = Arc::new(Mutex::new(doc));
        handles.push(start_sync("b<->a", doc.clone(), b_send_to_a, b_recv_from_a));
        handles.push(start_sync("b<->c", doc.clone(), b_send_to_c, b_recv_from_c));
    }

    // Spawn peer C
    {
        let doc = Doc::new();
        let doc = Arc::new(Mutex::new(doc));
        handles.push(start_sync("c<->a", doc.clone(), c_send_to_a, c_recv_from_a));
        handles.push(start_sync("c<->b", doc.clone(), c_send_to_b, c_recv_from_b));
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

fn start_sync(
    id: &'static str,
    doc: Arc<Mutex<Doc>>,
    sender: Sender<Vec<u8>>,
    receiver: Receiver<Vec<u8>>,
) -> thread::JoinHandle<()> {
    return thread::spawn(move || {
        sender
            .send(doc.lock().unwrap().transact().snapshot().encode_v2())
            .unwrap();
        let their_snapshot = Snapshot::decode_v2(&receiver.recv().unwrap()).unwrap();
        let their_snapshot = Arc::new(Mutex::new(their_snapshot));

        let read = thread::spawn({
            let doc = Arc::clone(&doc);
            let their_snapshot = Arc::clone(&their_snapshot);
            move || sync_read(id, doc, their_snapshot, receiver)
        });

        let write = thread::spawn({
            let doc = Arc::clone(&doc);
            let their_snapshot = Arc::clone(&their_snapshot);
            move || sync_write(id, doc, their_snapshot, sender)
        });

        thread::sleep(Duration::from_millis(1_000));
        let doc = doc.lock().unwrap();
        let txn = doc.transact();
        println!(
            "[{id}] final: doc={:?} snapshot={:?}",
            doc.to_json(&txn),
            txn.snapshot(),
        );
        drop(txn);
        drop(doc);

        read.join().unwrap();
        write.join().unwrap();
    });
}

fn sync_read(
    id: &str,
    doc: Arc<Mutex<Doc>>,
    their_snapshot: Arc<Mutex<Snapshot>>,
    receiver: Receiver<Vec<u8>>,
) {
    loop {
        let update = Update::decode_v2(&receiver.recv().unwrap()).unwrap();
        let doc = doc.lock().unwrap();

        // Update our local prediction of their snapshot with the data they will now have
        let mut their_snapshot = their_snapshot.lock().unwrap();
        // their_snapshot.delete_set.merge(update.delete_set.clone());
        // their_snapshot.delete_set.squash();
        their_snapshot.state_map.merge(update.state_vector());
        drop(their_snapshot);

        // Send the missing data to them
        let mut txn = doc.transact_mut();
        txn.apply_update(update);
        println!(
            "[{id}] recv: doc={:?} snapshot={:?}",
            doc.to_json(&txn),
            txn.snapshot(),
        );
    }
}

fn sync_write(
    id: &str,
    doc: Arc<Mutex<Doc>>,
    their_snapshot: Arc<Mutex<Snapshot>>,
    sender: Sender<Vec<u8>>,
) {
    loop {
        let doc = doc.lock().unwrap();
        let txn = doc.transact();
        let our_snapshot = txn.snapshot();
        let mut their_snapshot = their_snapshot.lock().unwrap();
        let mut their_next_snapshot = their_snapshot.clone();

        // See what happens to their snapshot if we apply our update
        // their_next_snapshot.delete_set.merge(our_snapshot.delete_set.clone());
        // their_next_snapshot.delete_set.squash();
        their_next_snapshot.state_map.merge(our_snapshot.state_map);

        // If their snapshot will change, then they need this update
        if their_next_snapshot != *their_snapshot {
            let update = txn.encode_state_as_update_v2(&their_snapshot.state_map);
            println!(
                "[{id}] send: doc={:?} snapshot={:?}",
                doc.to_json(&txn),
                txn.snapshot(),
            );
            drop(txn);
            *their_snapshot = their_next_snapshot;
            sender.send(update).unwrap();
        } else {
            // Otherwise they don't need to be updated at all
            drop(txn);
        }
        drop(their_snapshot);

        // Wait until after the next transaction
        let barrier = Arc::new(Barrier::new(2));
        let subscription = doc.observe_after_transaction({
            let barrier = barrier.clone();
            move |_| {
                barrier.wait();
            }
        });
        drop(doc);
        barrier.wait();
        drop(subscription);
    }
}

I'm running this on a MacBook laptop with an M1 chip. Sometimes it runs fine, which looks like this:

[b<->a] send: doc=Map({"foo": Map({"b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1) }, state_map: StateVector({2523915461: 2}) }
[a<->b] send: doc=Map({"foo": Map({"a": Number(3.0)})}) snapshot=Snapshot { delete_set:  { 2366383435: [0..2) }, state_map: StateVector({2366383435: 3}) }
[b<->c] send: doc=Map({"foo": Map({"b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1) }, state_map: StateVector({2523915461: 2}) }
[a<->c] send: doc=Map({"foo": Map({"a": Number(3.0)})}) snapshot=Snapshot { delete_set:  { 2366383435: [0..2) }, state_map: StateVector({2366383435: 3}) }
[c<->b] recv: doc=Map({"foo": Undefined}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1) }, state_map: StateVector({2523915461: 2}) }
[a<->b] recv: doc=Map({"foo": Map({"a": Number(3.0), "b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }
[b<->a] recv: doc=Map({"foo": Map({"b": Number(-2.0), "a": Number(3.0)})}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }
[a<->c] send: doc=Map({"foo": Map({"a": Number(3.0), "b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }
[c<->a] recv: doc=Map({"foo": Undefined}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }
[b<->c] send: doc=Map({"foo": Map({"a": Number(3.0), "b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }
[c<->b] send: doc=Map({"foo": Undefined}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }
[c<->a] send: doc=Map({"foo": Undefined}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }
[b<->c] recv: doc=Map({"foo": Map({"a": Number(3.0), "b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }
[c<->a] recv: doc=Map({"foo": Undefined}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }
[a<->c] recv: doc=Map({"foo": Map({"a": Number(3.0), "b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }
[c<->b] recv: doc=Map({"foo": Undefined}) snapshot=Snapshot { delete_set:  { 2523915461: [0..1), 2366383435: [0..2) }, state_map: StateVector({2523915461: 2, 2366383435: 3}) }

Other times it crashes with a corrupted heap, like this:

[b<->a] send: doc=Map({"foo": Map({"b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2170544377: [0..1) }, state_map: StateVector({2170544377: 2}) }
[a<->c] send: doc=Map({"foo": Map({"a": Number(3.0)})}) snapshot=Snapshot { delete_set:  { 1112460419: [0..2) }, state_map: StateVector({1112460419: 3}) }
[b<->c] send: doc=Map({"foo": Map({"b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2170544377: [0..1) }, state_map: StateVector({2170544377: 2}) }
[a<->b] send: doc=Map({"foo": Map({"a": Number(3.0)})}) snapshot=Snapshot { delete_set:  { 1112460419: [0..2) }, state_map: StateVector({1112460419: 3}) }
[c<->a] recv: doc=Map({"foo": Undefined}) snapshot=Snapshot { delete_set:  { 1112460419: [0..2) }, state_map: StateVector({1112460419: 3}) }
[a<->b] recv: doc=Map({"foo": Map({"a": Number(3.0), "b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2170544377: [0..1), 1112460419: [0..2) }, state_map: StateVector({2170544377: 2, 1112460419: 3}) }
[a<->c] send: doc=Map({"foo": Map({"b": Number(-2.0), "a": Number(3.0)})}) snapshot=Snapshot { delete_set:  { 2170544377: [0..1), 1112460419: [0..2) }, state_map: StateVector({2170544377: 2, 1112460419: 3}) }
demo(48497,0x170bff000) malloc: Heap corruption detected, free list is damaged at 0x600000c10120
*** Incorrect guard value: 24053980266783
[c<->b] recv: doc=Map({"foo": Undefined}) snapshot=Snapshot { delete_set:  { 2170544377: [0..1), 1112460419: [0..2) }, state_map: StateVector({2170544377: 2, 1112460419: 3}) }
[b<->a] recv: doc=Map({"foo": Map({"a": Number(3.0), "b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 2170544377: [0..1), 1112460419: [0..2) }, state_map: StateVector({2170544377: 2, 1112460419: 3}) }
demo(48497,0x170bff000) malloc: *** set a breakpoint in malloc_error_break to debug
[c<->a] send: doc=Map({"foo": Undefined}) snapshot=Snapshot { delete_set:  { 2170544377: [0..1), 1112460419: [0..2) }, state_map: StateVector({2170544377: 2, 1112460419: 3}) }
zsh: abort cargo run

And sometimes it just segfaults:

[a<->b] send: doc=Map({"foo": Map({"a": Number(3.0)})}) snapshot=Snapshot { delete_set:  { 4030028850: [0..2) }, state_map: StateVector({4030028850: 3}) }
[b<->a] send: doc=Map({"foo": Map({"b": Number(-2.0)})}) snapshot=Snapshot { delete_set:  { 3307822593: [0..1) }, state_map: StateVector({3307822593: 2}) }
[a<->c] send: doc=Map({"foo": Map({"a": Number(3.0)})}) snapshot=Snapshot { delete_set:  { 4030028850: [0..2) }, state_map: StateVector({4030028850: 3}) }
[b<->a] recv: doc=Map({"foo": Map({"b": Number(-2.0), "a": Number(3.0)})}) snapshot=Snapshot { delete_set:  { 3307822593: [0..1), 4030028850: [0..2) }, state_map: StateVector({3307822593: 2, 4030028850: 3}) }
[a<->b] recv: doc=Map({"foo": Map({"b": Number(-2.0), "a": Number(3.0)})}) snapshot=Snapshot { delete_set:  { 3307822593: [0..1), 4030028850: [0..2) }, state_map: StateVector({3307822593: 2, 4030028850: 3}) }
[c<->a] recv: doc=Map({"foo": Undefined}) snapshot=Snapshot { delete_set:  { 4030028850: [0..2) }, state_map: StateVector({4030028850: 3}) }
zsh: segmentation fault cargo run

Hopefully you can use my code to reproduce this issue as well. It happens pretty reliably for me (crashes ~50% of the time).

My understanding of Rust is that Rust code which doesn't use unsafe should either run to completion, hang, or panic, and that if heap corruption or segfaults happen, it indicates that a library is using unsafe incorrectly. Since my code does not use unsafe and I'm only using one library (yrs) which does use unsafe, I believe that means the memory safety issue is in yrs.

I'm already using Arc<Mutex<Doc>> as recommended by #68 (comment) so that I don't have overlapping calls to transact() and transact_mut(). I think this particular memory safety issue involves calling Subscription::drop while the document is not locked. Perhaps this means Doc is not thread-safe after all, and adding impl Send for Doc was incorrect? I realize that my code for dealing with subscriptions is buggy (the barrier may cause a deadlock on subsequent updates) but that still shouldn't result in a memory safety issue.

Edit: I also posted the original issue I was having here: #374

@Horusiath
Copy link
Collaborator

Horusiath commented Jan 2, 2024

Generally speaking snapshots may not work correctly with with Doc::new as the default options have block garbage collection turned on. You need to turn on skip_gc flag (Doc::with_options(Options {skip_gc:true, ..Options::default()})), otherwise snapshot may try to rollback document state to a point where it tries to reach element that was observed as unreachable from latest document point-of-view and therefore garbage collected.

@evanw
Copy link
Author

evanw commented Jan 2, 2024

Thanks for mentioning the GC flag. However, I'm just using snapshot as a shorthand for "tuple of state vector and delete set" and not using them for document rollback. From what I understand, with Yjs you need both of these values to determine if an update is necessary (i.e. an update is necessary if either the state vector or the delete set is different), which is why hashing the snapshot to determine if a sync is necessary is recommended.

I'm hoping that using delete sets like this is independent of the garbage collection setting. Is it ok to use the delete sets returned from Transaction::snapshot() for this purpose when garbage collection is off?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants