Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RowGroupReader.get_row_iter() fails with Path ColumnPath not found #5064

Closed
mmaitre314 opened this issue Nov 11, 2023 · 8 comments · Fixed by #5102
Closed

RowGroupReader.get_row_iter() fails with Path ColumnPath not found #5064

mmaitre314 opened this issue Nov 11, 2023 · 8 comments · Fixed by #5102
Labels
bug parquet Changes to the parquet crate

Comments

@mmaitre314
Copy link
Contributor

Describe the bug
I am trying to read a Parquet file generated by a Hadoop MapReduce job. The schema is a bit complex, with a minimal repro looking something like this:

message schema {
    REPEATED group level1 {
        REPEATED group level2 {
            REQUIRED group level3 {
                REQUIRED INT64 value3;
            }
        }
        REQUIRED INT64 value1;
    }
}

Reading this schema fails with Path ColumnPath { parts: [\"value1\"] } not found. The error message is correct: value1 does not exist. It is off by one level and should be level1.value1.

Looking into the code, it seems like there is a double path.pop() happening in reader_tree().

To Reproduce
Minimal unit test reproing the issue:

  • Cargo.toml
[package]
name = "parquet_complex_type"
version = "0.1.0"
edition = "2021"

[dependencies]
bytes = "1.5.0"
parquet = "48.0.0"
  • src\lib.rs
#[cfg(test)]
mod tests {
    use bytes::Bytes;
    use std::sync::Arc;
    use parquet::data_type::Int64Type;
    use parquet::file::reader::{FileReader, SerializedFileReader};
    use parquet::file::writer::SerializedFileWriter;
    use parquet::schema::parser::parse_message_type;

    #[test]
    fn test_read_write_parquet2() {
        // Create schema
        let schema = Arc::new(parse_message_type("
            message schema {
                REPEATED group level1 {
                    REPEATED group level2 {
                        REQUIRED group level3 {
                            REQUIRED INT64 value3;
                        }
                    }
                    REQUIRED INT64 value1;
                }
            }").unwrap());

        // Write Parquet file to buffer
        let mut buffer: Vec<u8> = Vec::new();
        let mut file_writer = SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
        let mut row_group_writer = file_writer.next_row_group().unwrap();

        // Write column level1.level2.level3.value3
        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
        column_writer.typed::<Int64Type>().write_batch(&[30, 31, 32], Some(&[2, 2, 2]), Some(&[0, 0, 0])).unwrap();
        column_writer.close().unwrap();

        // Write column level1.value1
        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
        column_writer.typed::<Int64Type>().write_batch(&[10, 11, 12], Some(&[1, 1, 1]), Some(&[0, 0, 0])).unwrap();
        column_writer.close().unwrap();

        // Finalize Parquet file
        row_group_writer.close().unwrap();
        file_writer.close().unwrap();
        assert_eq!(&buffer[0..4], b"PAR1");

        // Read Parquet file from buffer
        let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
        assert_eq!(3, reader.metadata().file_metadata().num_rows());
        let row_group_reader = reader.get_row_group(0).unwrap();
        let mut rows = row_group_reader.get_row_iter(None).unwrap();
        assert_eq!(rows.next().unwrap().unwrap().to_string(), "{level1: [{level2: [{level3: {value3: 30}}], value1: 10}]}");
        assert_eq!(rows.next().unwrap().unwrap().to_string(), "{level1: [{level2: [{level3: {value3: 31}}], value1: 11}]}");
        assert_eq!(rows.next().unwrap().unwrap().to_string(), "{level1: [{level2: [{level3: {value3: 32}}], value1: 12}]}");
    }
}

Expected behavior
Reading the Parquet file succeeds.

Additional context

A fix may be to push back the value just popped so that it can be popped again right after. This 2-line change in reader_tree() got the test to pass. I am very new to that codebase but happy to send that as Pull Request along with the test if that fix makes sense.

	_ if repetition == Repetition::REPEATED => {
		let required_field = Type::group_type_builder(field.name())
			.with_repetition(Repetition::REQUIRED)
			.with_converted_type(field.get_basic_info().converted_type())
			.with_fields(field.get_fields().to_vec())
			.build()?;

		let value = path.pop().unwrap();        <--- keep the path value here

		let reader = self.reader_tree(
			Arc::new(required_field),
			path,
			curr_def_level,
			curr_rep_level,
			paths,
			row_group_reader,
		)?;

		path.push(value);                        <--- push the path value back here

		Reader::RepeatedReader(
			field,
			curr_def_level - 1,
			curr_rep_level - 1,
			Box::new(reader),
		)
	}
@mmaitre314 mmaitre314 added the bug label Nov 11, 2023
@tustvold
Copy link
Contributor

tustvold commented Nov 11, 2023

Possibly a duplicate of #2394

Currently repeated fields are only supported by the arrow interface, or the lower level ColumnReader

@mmaitre314
Copy link
Contributor Author

That indeed looks related to #2394. Reading through the issue conversation, I also hit the error called `Option::unwrap()` on a `None` value mentioned in related #3745 (although I don't have a good repro right now).

It sounds like there isn't enough volunteer time to expand get_row_iter(). Would reviewing pull requests in that code be an option? If issues can be tackled as byte-size chunks, I can likely contribute. So far testing decoding REPEATED actually looked pretty good. For instance, this non-trivial schema seems to be handled fine (at least in the case of one repetition):

message schema {
    REQUIRED BYTE_ARRAY value1 (UTF8);
    OPTIONAL BYTE_ARRAY value2 (UTF8);
    REQUIRED INT64 value3;
    REPEATED group level1 {
        REQUIRED BYTE_ARRAY value4 (UTF8);
        OPTIONAL INT64 value5;
        OPTIONAL BYTE_ARRAY value6 (UTF8);
    }
}

Doc/example contribution could also be fair game (I was also thinking about adding something around ObjectStore + Parquet async -- I had a hard time figuring this one out but it actually works really well and I think I understand that part enough now to expand the docs).

@tustvold
Copy link
Contributor

tustvold commented Nov 11, 2023

I can't promise speedy reviews, but I can try to review PRs. I would ask though if you plan on large feature work in this area you file tickets first to get feedback on what you propose.

That being said, I want to just set expectations that these APIs will always be orders of magnitude slower than their columnar brethren. There is also a huge amount of subtlety to correctly handling nested schema in all the weird forms it comes in, which the current row readers don't even attempt to handle. If you want a hobby project to work on, happy to help, but if you're looking to base an application around these APIs I would strongly encourage just using the arrow interface

@mmaitre314
Copy link
Contributor Author

Sounds fair. I am going to run tests on more datasets to see if the scenario can be unblocked through a targeted fix, and if not I'll resolve as Won't Fix. The call to get_row_iter() happens in a codebase I do not own and switching it to arrow seems unlikely. Thanks for the speedy replies, and all the work everyone has done on that crate. Very appreciated.

@mmaitre314
Copy link
Contributor Author

To get started, I sent PR #5093 to expand the crate docs with a summary of the discussion here and in #2394.

@mmaitre314
Copy link
Contributor Author

@tustvold : would you be able to take a look at PR #5102 ? I tried to make the change minimal, with 2 lines of actual code change and the rest of tests.

@tustvold
Copy link
Contributor

Apologies, it is on my list, but I've been a bit swamped recently and I need some time to sit down and learn how that code is working before I can review effectively

@tustvold tustvold added the parquet Changes to the parquet crate label Jan 5, 2024
@tustvold
Copy link
Contributor

tustvold commented Jan 5, 2024

label_issue.py automatically added labels {'parquet'} from #5093

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug parquet Changes to the parquet crate
Projects
None yet
2 participants