Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ReadRefReader.buf public #300

Closed
wbenny opened this issue Mar 17, 2022 · 12 comments
Closed

Make ReadRefReader.buf public #300

wbenny opened this issue Mar 17, 2022 · 12 comments

Comments

@wbenny
Copy link

wbenny commented Mar 17, 2022

Hi,
I want to deserialize stream of MessagePack structures with zero-copy.

But when I create ReadRefReader with:

    let contents = std::fs::read(&path).unwrap();
    let mut deserializer = Deserializer::from_read_ref(&contents);

... and then deserialize a first struct:

    let msg = Message::deserialize(&mut deserializer)?;

I have no way of knowing where did the deserializer end.

However, if I would add this into the impl<'de, R> Deserializer<ReadRefReader<'de, R>>:

    pub fn get_buf(&self) -> &'de [u8] {
        self.rd.buf
    }

... and used it like this:

    let mut buf = &contents[..];

    let mut result = Vec::new();

    loop {
        let mut deserializer = Deserializer::from_read_ref(&buf[..]);

        if let Ok(event) = Message::deserialize(&mut deserializer) {
            result.push(event);
        }
        else {
            break;
        }

        buf = deserializer.get_buf();
    }

It would work.

So the question is - am I missing something? Is there a way how to deserialize streaming MessagePack structs from buffer reference? If not - would it be possible to add the get_buf() method? And if not... could you think of a better way how to implement this?

@kornelski
Copy link
Collaborator

I haven't tried parsing concatenated messages like this, so I'm not sure if the reader is reading only as little as necessary, but if it is, then you should be able to use Cursor to reuse the reader across multiple calls, or even &mut &[u8] IIRC, since it also implements Read and remembers position read.

@wbenny
Copy link
Author

wbenny commented Mar 17, 2022

Thanks for the prompt response!

From what I've been currently testing, it consumes only what is necessary - and I'm parsing 10k's of concatenated messages, which use various types/lists/maps/extdata.

Cursor, unfortunatelly, does not have AsRef trait, therefore, it cannot be wrapped into ReadRefReader. As for the &mut &[u8].. I don't think I understand what you meant by that. Any hint would be appreciated.

@kornelski
Copy link
Collaborator

kornelski commented Mar 17, 2022

Read is implemented for &mut Read, so you can have AsRef for any reader.

Read is also implemented directly on slices, and reading them shortens the slice (mutates the slice metadata in place to remove the bit that has been read).

@wbenny
Copy link
Author

wbenny commented Mar 17, 2022

I think I got it (or did I?), but... it produces an error.

pub fn parse<'a>(contents: &'a [u8]) -> Vec<Message<'a>> {
    let mut result = Vec::new();

    let mut deserializer = Deserializer::from_read_ref(&mut contents.as_ref());

    loop {
        if let Ok(message) = Message::deserialize(&mut deserializer) {
            result.push(message);
        }
        else {
            break;
        }
    }

    result
// ^^^^^^ returns a value referencing data owned by the current function
}

@wbenny
Copy link
Author

wbenny commented Mar 17, 2022

I'm really sorry, but I'm short of any ideas. Would you be kind enough to provide a short example? Either with &mut &[u8] or with Cursor?

@wbenny
Copy link
Author

wbenny commented Mar 17, 2022

So, this is working - in a sense that the result is properly filled:

pub fn parse(contents: &[u8]) -> Vec<Message> {
    let mut result = Vec::new();

    let cursor = &mut contents.as_ref(); // or &mut &contents
    let mut deserializer = Deserializer::from_read_ref(cursor);

    loop {
        let r = Message::deserialize(&mut deserializer);
        if let Ok(message) = r {
            result.push(message);
        }
        else {
            break;
        }
    }

    // result
    Vec::new()
}

However, I still can't return the result from the function. I think I understand the reason, but I don't know how to work around it. Except for maybe rewriting the method as fn parse(contents: &mut &[u8]) - which I would like to avoid.

EDIT:
Confirming that it does work when I rewrite it as pub fn parse<'a>(contents: &'a mut &[u8]) -> Vec<Message<'a>>.
However, calling such function becomes quite ugly:

    let log = std::fs::read(r#"logs\log.msgpack"#).unwrap();
    let log = &mut &log[..]; // wtf
    let messages = message::parser::parse_binlog(binlog);

@kornelski
Copy link
Collaborator

kornelski commented Mar 18, 2022

edit: nevermind. I mistakenly thought from_read_ref function takes &impl Read, not AsRef<[u8]>!

What I've meant about the cursor, etc. was for from_read, which takes actual Read trait.

from_read_ref is silly, because it requires that the argument can be referenced as a slice, so it's 100% redundant with from_slice. I see not point in having it. I'll mark it as deprecated.

@wbenny
Copy link
Author

wbenny commented Mar 18, 2022

But there isn't Deserialize::from_slice method. Only rmp_serde::from_slice that returns T. Also, for some reason I feel more and more lost with each of your new comment :) I still have no idea how to properly resolve this issue.

Deserialize::from_read_ref is really useful for my case, because otherwise there wouldn't exist any way to achieve what I want to do.

Replacing let r = Message::deserialize(&mut deserializer); with rmp_serde::from_slice::<Message>(contents) wouldn't work, because from_slice internally creates new deserializer - so it would end up in an infinite loop.

@kornelski
Copy link
Collaborator

kornelski commented Mar 22, 2022

I mean this:

#[derive(serde::Serialize, serde::Deserialize)]
struct Test {
    msg: String,
}

fn main() {
    let mut ser = Vec::new();

    rmp_serde::encode::write(&mut ser, &Test {msg: "Hello".into()}).unwrap();
    rmp_serde::encode::write(&mut ser, &Test {msg: "World".into()}).unwrap();

    /////////////////////

    let mut reader = ser.as_slice();

    let one: Test = rmp_serde::from_read(&mut reader).unwrap();
    let two: Test = rmp_serde::from_read(&mut reader).unwrap();

    println!("{} {}", one.msg, two.msg);
}

You don't need direct use of deserializer or any special buffer manipulation, because Read already works with slices.

@wbenny
Copy link
Author

wbenny commented Mar 22, 2022

Thanks, I really appreciate you answering, however, I stressed multiple times that I'm specifically interested in zero-copy deserialization, and this example, unfortunatelly, doesn't work:

#[derive(serde::Serialize, serde::Deserialize)]
struct Test<'a> {
    msg: &'a str,
}

fn main() {
    let mut ser = Vec::new();

    rmp_serde::encode::write(&mut ser, &Test {msg: "Hello"}).unwrap();
    rmp_serde::encode::write(&mut ser, &Test {msg: "World"}).unwrap();

    /////////////////////

    let mut reader = ser.as_slice();

    let one: Test = rmp_serde::from_read_ref(&mut reader).unwrap();
    let two: Test = rmp_serde::from_read_ref(&mut reader).unwrap();

    println!("{} {}", one.msg, two.msg);
}

@kornelski
Copy link
Collaborator

Ah, yes. I forgot about this complication.

@wbenny
Copy link
Author

wbenny commented Apr 19, 2022

Thanks @kornelski !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants