Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse rows on demand? #91

Closed
daschl opened this issue Sep 18, 2017 · 24 comments
Closed

Parse rows on demand? #91

daschl opened this issue Sep 18, 2017 · 24 comments

Comments

@daschl
Copy link

daschl commented Sep 18, 2017

I'm trying to use rust-csv in an async context and have troubles getting this done with the Reader - maybe there is a way to improve the API ? (or I am missing anything).

So I'm consuming a futures::Stream and want to map a line of String (which would be my csv line) into a HashMap of the parsed one.

So my question is: Is there a way to "setup" rust-csv with all the params and then push/consume lines in as they come in and consume the parsed output?

Thanks,
Michael

@BurntSushi
Copy link
Owner

So my question is: Is there a way to "setup" rust-csv with all the params and then push/consume lines in as they come in and consume the parsed output?

The simple answer is: yes. And you do it by implementing the Read trait.

If you can't do that, then you're probably signing up for some pain by insisting on reading CSV data asynchronously. An API based around lines isn't as simple as you might think, since you cannot guarantee that one line corresponds to one record. So you if you want to feed a CSV parser lines, then you actually wind up needing a full incremental parser. This is what the csv-core crate provides, but it is substantially less ergonomic than the csv crate proper. In fact, the csv crate is basically a convenience layer on top of csv-core, where that convenience comes with the cost of assuming an implementation of Read and the ability to dynamically allocate memory. So if you want to throw away one of those assumptions, then you necessarily need to throw away the convenience that comes with it. But csv-core itself is sufficiently general that you could use it in either a sync or an async context.

With all that said, I haven't used futures so I don't know what to tell you. I don't know if there's something you can do to provide an implementation of Read, or if you really do need to use csv-core.

@daschl
Copy link
Author

daschl commented Sep 18, 2017

@BurntSushi thanks very much, I'll see if I can get Read to work, if not I'll go down to csv-core. I'll report my progress in case someone else also has a similar need/quesiton.

@daschl
Copy link
Author

daschl commented Sep 18, 2017

@BurntSushi a quick follow-up if you don't mind. Is there a way to use a cursor without capturing it?

    fn run(&self, input: S) -> Box<Stream<Item = SensorEvent, Error = ()>> {
        let source_field = self.config.get_string("source").unwrap();

        let mut cursor: Cursor<Vec<u8>> = Cursor::new(vec![]);
        let mut reader = csv::Reader::from_reader(cursor); // <--- captures it

        let modified = input.filter_map(move |mut event| {
            let input_line = match event.data_mut().get(&source_field) {
                Some(&SensorDataValue::String(ref s)) => s.clone(),
                _ => return None,
            };

            cursor.write(input_line.as_bytes()); // <-- can't write to it here anymore
            for result in reader.deserialize() {
                let record: HashMap<String, SensorDataValue> = result.unwrap();
                println!("{:?}", record);
            }

            Some(event)
        });

        Box::new(modified)
    }

So in this example I'm trying to "fake" the cursor with an in-memory vec but its consumed by the csv::Reader so I can't push into it anymore.

@BurntSushi
Copy link
Owner

@daschl In your example, you're transferring ownership of the cursor to the CSV reader, so you can't use it after that. You could just ask the CSV reader for a mutable reference to the underlying reader, and that might work. So e.g.,

reader.get_mut().write(input_line.as_bytes());

@daschl
Copy link
Author

daschl commented Sep 18, 2017

@BurntSushi your approach works, but I can't figure out how ot make the Reader consume the data. I tried setting the cursor back as well as seeking, but the reader never emits something.

fn run(&self, input: S) -> Box<Stream<Item = SensorEvent, Error = ()>> {
        let source_field = self.config.get_string("source").unwrap();

        let cursor: Cursor<Vec<u8>> = Cursor::new(vec![]);
        let mut reader = csv::Reader::from_reader(cursor);

        let modified = input.filter_map(move |mut event| {
            let input_line = match event.data_mut().get(&source_field) {
                Some(&SensorDataValue::String(ref s)) => s.clone(),
                _ => return None,
            };

            let len = input_line.len() as i64;
            reader.get_mut().write_all(input_line.as_bytes()).unwrap();
            reader.get_mut().seek(::std::io::SeekFrom::Current(-len)).unwrap(); // tried all kinds of approaches

            for result in reader.deserialize() {// <--- we never get in the loop
                let record: HashMap<String, SensorDataValue> = result.unwrap();
                println!("IN ---> {:?}", record);
            }


            Some(event)
        });

        Box::new(modified)
    }

When debug printing the reader I can see that the internal cursor of course got the data, but no state advanced.

edit: actually I think this might be correct but it still doesn't emit anything.. I must be missing something.

            let start = reader.get_mut().position();
            reader.get_mut().seek(::std::io::SeekFrom::End(0)).unwrap();
            reader.get_mut().write_all(input_line.as_bytes()).unwrap();
            reader.get_mut().set_position(start); 

@BurntSushi
Copy link
Owner

BurntSushi commented Sep 19, 2017

@daschl Sorry but I don't have time to dig into your code. You might increase the chances of that happening by providing a full example that I can just run.

In the mean time, I came across the tokio_file_unix crate, and it looks like it might directly serve your interests.

I'll note that this program works for me:

extern crate csv;

use std::collections::HashMap;
use std::io::{Cursor, Write};

fn main() {
    let cursor = Cursor::new(vec![]);
    let mut reader = csv::Reader::from_reader(cursor);

    reader.get_mut().write_all(b"h1,h2,h3\nfoo,bar,baz\nabc,mno,xyz").unwrap();
    reader.get_mut().set_position(0);
    for result in reader.deserialize() {
        let record: HashMap<String, String> = result.unwrap();
        println!("{:?}", record);
    }
}

The output is:

$ cargo run                                                     
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/csv-91`
{"h2": "bar", "h1": "foo", "h3": "baz"}
{"h3": "xyz", "h1": "abc", "h2": "mno"}

@daschl
Copy link
Author

daschl commented Sep 19, 2017

@BurntSushi I think i figured out what the problem is. In each and every sample you find the headers and the first line of the csv are coming from the same input buffer, in which case it works fine.

But if the header line is coming first, then decode is called nothing happens as expected. But then if the second line comes along it doesn't seem to "remember" the headers from the first line and still doesn't emit anything.

This works (your example), very slightly modified:

extern crate csv;

use std::collections::HashMap;
use std::io::{Cursor, Write};

fn main() {
    let cursor = Cursor::new(vec![]);
    let mut reader = csv::Reader::from_reader(cursor);

    let position = reader.get_mut().position();
    reader.get_mut().write_all(b"h1,h2,h3\nfoo,bar,baz\nabc,mno,xyz\n").unwrap();
    reader.get_mut().set_position(position);
    for result in reader.deserialize() {
        let record: HashMap<String, String> = result.unwrap();
        println!("{:?}", record);
    }
}

But this doesn't emit the second line, just the first one:

extern crate csv;

use std::collections::HashMap;
use std::io::{Cursor, Write};

fn main() {
    let cursor = Cursor::new(vec![]);
    let mut reader = csv::Reader::from_reader(cursor);

    let position = reader.get_mut().position();
    reader.get_mut().write_all(b"h1,h2,h3\nfoo,bar,baz\n").unwrap();
    reader.get_mut().set_position(position);
    for result in reader.deserialize() {
        let record: HashMap<String, String> = result.unwrap();
        println!("{:?}", record);
    }
    
    let position = reader.get_mut().position();
    reader.get_mut().write_all(b"abc,mno,xyz\n").unwrap();
    reader.get_mut().set_position(position);
    for result in reader.deserialize() {
        let record: HashMap<String, String> = result.unwrap();
        println!("{:?}", record);
    }
}

But even if I'm disabling the headers the last iteration doesn't emit anything:

extern crate csv;

use std::io::{Cursor, Write};

fn main() {
    let cursor = Cursor::new(vec![]);
    let mut reader = csv::ReaderBuilder::new().has_headers(false).from_reader(cursor);

    let position = reader.get_mut().position();
    reader.get_mut().write_all(b"h1,h2,h3\nfoo,bar,baz").unwrap();
    reader.get_mut().set_position(position);
    for result in reader.records() {
        println!("{:?}", result);
    }
    
    let position = reader.get_mut().position();
    reader.get_mut().write_all(b"abc,mno,xyz").unwrap();
    reader.get_mut().set_position(position);
    for result in reader.records()  {
        println!("{:?}", result);
    }
}

Any idea what I am missing that csv expects to emit subsequent deserialize/records calls?

@BurntSushi
Copy link
Owner

Yeah unfortunately the header logic is quite complicated, and it is most complex when performing seeks. It's made even worse by trying to write to the underlying reader while the CSV reader is trying to maintain its own state.

This code almost works:

extern crate csv;

use std::collections::HashMap;
use std::io::{self, Cursor, Write};

fn main() {
    let cursor = Cursor::new(vec![]);
    let mut reader = csv::Reader::from_reader(cursor);

    let position = reader.position().clone();
    reader.get_mut().write_all(b"h1,h2,h3\nfoo,bar,baz\n").unwrap();
    reader.get_mut().set_position(position.byte());
    reader.seek_raw(io::SeekFrom::Start(position.byte()), position).unwrap();
    for result in reader.deserialize() {
        let record: HashMap<String, String> = result.unwrap();
        println!("{:?}", record);
    }

    let position = reader.position().clone();
    reader.get_mut().write_all(b"abc,mno,xyz\n").unwrap();
    reader.seek_raw(io::SeekFrom::Start(position.byte()), position).unwrap();
    for result in reader.deserialize() {
        let record: HashMap<String, String> = result.unwrap();
        println!("{:?}", record);
    }
}

The only flaw is that it emits the header row with its keys mapped to themselves, so you might need some additional state to skip the very first row you read.

The only real trick in this scenario is the use of the CSV reader's seek_raw method. We want that for two reasons:

  1. The CSV reader has its own state, so you should always try to seek with it instead of seeking the underlying reader directly.
  2. The seek method on the CSV reader is sometimes clever and "knows" when not to seek because it assumes the caller isn't manipulating the underlying reader. But you are in this case, so you need to use seek_raw which will always seek the underlying reader. This eliminates the need to call set_position explicitly. (Unfortunately, you still need to call set_position for the first read because the first actual call to seek_raw will attempt to read the headers, since the CSV reader knows that it hasn't read anything yet.)

Most or all of this is actually documented on the CSV reader's seek and seek_raw methods.

@daschl
Copy link
Author

daschl commented Sep 19, 2017

@BurntSushi thanks, I think that gives me enough to move further. I'll provide the final solution here once I got it working :)

@BurntSushi
Copy link
Owner

@daschl Awesome! Look forward to it!

@daschl
Copy link
Author

daschl commented Sep 20, 2017

@BurntSushi I think I got it sorted out, the only thing which worries me a bit is that I keep extending my cursor but I don't want to grow the vec out of bounds. I may just need to reset it back to 0 all the time so that the reader just has one line to parse.

Note that I now use records and skip the header specialization and set it manually in an Option. This also allows me in the future to specify a list of headers manually in case the csv doesn't have it at the top:

    fn run(&self, input: S) -> Box<Stream<Item = SensorEvent, Error = ()>> {
        let source_field = self.config.get_string("source").unwrap();

        let cursor: Cursor<Vec<u8>> = Cursor::new(vec![]);
        let mut reader = csv::ReaderBuilder::new()
            .has_headers(false) // <--- avoid header specialization
            .from_reader(cursor);

        let mut headers: Option<StringRecord> = None; // <--- store the headers here
        let modified = input.filter_map(move |mut event| {
            let input_line = match event.data_mut().get(&source_field) {
                Some(&SensorDataValue::String(ref s)) => s.clone(),
                _ => return None,
            };


            let position = reader.position().clone();
            reader.get_mut().write_all(input_line.as_bytes()).unwrap();
            reader
                .seek_raw(io::SeekFrom::Start(position.byte()), position)
                .unwrap(); // <--- seek as you said above

            for result in reader.records() {
                if headers.is_none() { / record is treated as headers if not set
                    headers = Some(result.unwrap());
                    return None; // don't emit the headers as a downstream event
                }

                 // <---still do the serde deserialization and then add it to my event data
                let mut record: HashMap<String, SensorDataValue> =
                    result.unwrap().deserialize(headers.as_ref()).unwrap();
                event.data_mut().extend(record.drain());
            }

            Some(event)
        });

        Box::new(modified)
    }

@daschl
Copy link
Author

daschl commented Sep 20, 2017

Ok, so here is what I ended up with setting it back to 0 for every row since I know rust-csv is consuming the line anyways.

            let mut position = reader.position().clone();
            position.set_byte(0);
            reader.get_mut().set_position(position.byte());
            reader.get_mut().write_all(input_line.as_bytes()).unwrap();
            reader
                .seek_raw(io::SeekFrom::Start(position.byte()), position)
                .unwrap();

I'll close it, thanks very much for your help @BurntSushi

@daschl daschl closed this as completed Sep 20, 2017
@BurntSushi
Copy link
Owner

@daschl Glad you found something that works!

@Restioson
Copy link

Restioson commented Sep 29, 2017

I'd like to say a huge thanks for creating this thread @daschl. Was struggling with this for ages. PS, mind if I use your code snippet?

@daschl
Copy link
Author

daschl commented Sep 29, 2017

@Restioson please go ahead and use it, happy I could help someone else with it too. Let me know if you have further questions :)

@daschl
Copy link
Author

daschl commented Sep 29, 2017

I wonder if we should get an example together since it looks like more people need something like this, and the thread since closed is kinda hidden. I'll see if I can come up with a self-contained PR for this which showcases it.

@Restioson
Copy link

Might it even be worth having a helper struct, something like StreamingCsvReader? It could wrap CsvReader and basically implement this, but with simpler methods, like StreamingCsvReader.parse(xyz...)

@BurntSushi
Copy link
Owner

An example to add to the cookbook would be great!

@Restioson
Copy link

On a similar note, I've tried to implement this, but to no avail: https://gist.github.com/765d46640b29d8deb6863f4b895bd085 (missing field latitude)... This works without the cursor hackery. Could anyone help to shed light?

@BurntSushi
Copy link
Owner

@Restioson I don't have time to read that much code right now unfortunately. If you can post a smaller (preferably minimal) example that reproduces your problem, then I might be able to take a look.

@Restioson
Copy link

I'm not very sure which exact bit caused the problem here, but I can probably cut down some of the less necessary stuff, like the full struct definitions and whatnot, so I'll try get to that tommorow. (Side note, it looks like I accidentally shared the gist link instead of playground)

@Restioson
Copy link

Restioson commented Oct 12, 2017

@BurntSushi I refactored and made my code a bit smaller. However, it works without the cursor hackery.

@BurntSushi
Copy link
Owner

@Restioson Thanks. The error is much clearer now. I think I said this before, but the key is realizing that any call to seek or seek_raw on the CSV reader will attempt to read the headers if it hasn't already. So you need to make sure you move the cursor position back before seeking: https://play.rust-lang.org/?gist=1d37802118e8a10bb11570d444fca679&version=stable

@Restioson
Copy link

Wow, I feel dumb! Thanks so so much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants