Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet write failure (from record batches) when data is nested two levels deep #1744

Closed
ahmedriza opened this issue May 24, 2022 · 3 comments · Fixed by #1746
Closed

Parquet write failure (from record batches) when data is nested two levels deep #1744

ahmedriza opened this issue May 24, 2022 · 3 comments · Fixed by #1746
Labels
bug parquet Changes to the parquet crate

Comments

@ahmedriza
Copy link

ahmedriza commented May 24, 2022

Describe the bug
Let me introduce the Schema of the data in an easily readable format (the Apache Spark pretty print format):

root
 |-- id: string (nullable = true)
 |-- prices: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- value: double (nullable = true)
 |    |    |-- meta: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- loc: string (nullable = true)
 |-- bids: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- value: double (nullable = true)
 |    |    |-- meta: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- loc: string (nullable = true)

and some sample data:

+---+----------------------+----+
|id |prices                |bids|
+---+----------------------+----+
|t1 |[{GBP, 3.14, [{LON}]}]|null|
|t2 |[{USD, 4.14, [{NYC}]}]|null|
+---+----------------------+----+

As we can see, what we have here are three columns, a UTF-8 column called id and two columns called prices and bid that have the schema, i.e. list<struct<list>>.

I have deliberately left the bids column empty to show the bug.

The bug is that when when we read Parquet with the above schema, with the bids column null for all rows, from Rust code into record batches and then write those record batches to Parquet; the Parquet write fails with:

Error: Parquet error: Incorrect number of rows, expected 2 != 0 rows

This is happening at https://github.com/apache/arrow-rs/blob/master/parquet/src/file/writer.rs#L324 and is due to the fact that the bids column is null.

To Reproduce

Let's create the sample data using the schema as depicted above using the following Python code:

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

d1 = {
    "id": pd.Series(['t1', 't2']),
    "prices": pd.Series([
        [
            {
                "currency": "GBP",
                "value": 3.14,
                'meta': [
                    {'loc': 'LON'}
                ]
            }
        ],
        [
            {
                "currency": "USD",
                "value": 4.14,
                'meta': [
                    {'loc': 'NYC'}
                ]                
            }
        ]
    ]),
    "bids": pd.Series([], dtype='object')
}

df = pd.DataFrame(d1)

list_type = pa.list_(
    pa.struct([
        ('currency', pa.string()),
        ('value', pa.float64()),
        ('meta', pa.list_(
            pa.struct([
                ('loc', pa.string())
            ])
        ))
    ]))

schema = pa.schema([
    ('id', pa.string()),
    ('prices', list_type),
    ('bids', list_type)
])

table = pa.Table.from_pandas(df, schema=schema)
filename = '/tmp/demo_one_arrow.parquet'
pq.write_table(table, filename)

expected_table = pq.read_table(filename).to_pandas()
print(expected_table.to_string())

When we run this code, we can see that a valid Parquet file is indeed produced. The Parquet that is created is read back and we see the following:

   id                                                          prices  bids
0  t1  [{'currency': 'GBP', 'value': 3.14, 'meta': [{'loc': 'LON'}]}]  None
1  t2  [{'currency': 'USD', 'value': 4.14, 'meta': [{'loc': 'NYC'}]}]  None

Let's now try to read the same Parquet from Rust and write it back to another Parquet file:

use std::{fs::File, path::Path, sync::Arc};

use arrow::record_batch::RecordBatch;
use parquet::{
    arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader},
    file::serialized_reader::SerializedFileReader,
};
use std::error::Error;
type Result<T> = std::result::Result<T, Box<dyn Error>>;

pub fn main() -> Result<()> {
    let filename = "/tmp/demo_one_arrow.parquet";

    // Read Parquet from file
    let record_batches = read_parquet(filename)?;

    println!("Writing Parquet...");
    // write what we just read
    write_parquet("/tmp/demo_one_arrow2.parquet", record_batches)?;

    println!("Reading back...");
    // Read back what we just wrote
    let expected_batches = read_parquet("/tmp/demo_one_arrow2.parquet")?;
    let _expected_columns = expected_batches
        .iter()
        .map(|rb| rb.columns())
        .collect::<Vec<_>>();
    
    Ok(())
}

fn read_parquet(filename: &str) -> Result<Vec<RecordBatch>> {
    let path = Path::new(filename);
    let file = File::open(path)?;
    println!("Reading {}", filename);
    let reader = SerializedFileReader::new(file)?;
    let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
    let rb_reader = arrow_reader.get_record_reader(1024)?;
    let mut record_batches = vec![];
    for rb_result in rb_reader {
        let rb = rb_result?;
        record_batches.push(rb);
    }
    Ok(record_batches)
}

fn write_parquet(filename: &str, record_batches: Vec<RecordBatch>) -> Result<()> {
    let file = File::create(filename)?;
    let schema = record_batches[0].schema();
    let mut writer = ArrowWriter::try_new(file, schema, None)?;
    for batch in record_batches {
        writer.write(&batch)?;
    }
    writer.close()?;
    Ok(())
}

This reads the Parquet file fine, but fails when writing out the record batches as Parquet with the following error:

Error: Parquet error: Incorrect number of rows, expected 2 != 0 rows

I can see that this is due to the fact the bids column is null.

Expected behavior

We should expect the record batches to be written correctly to Parquet even if a column is null for all rows.

Additional context
The issue arises due to the presence of the second level of nesting, i.e. the following

('meta', pa.list_(
            pa.struct([
                ('loc', pa.string())
            ])
        ))

If we remove this second level of nesting, then the null bids column does get written. However, we expect this to work even in the presence of the second, third etc level of nesting, which works with pyarrow as well.

@ahmedriza ahmedriza added the bug label May 24, 2022
@ahmedriza ahmedriza changed the title Parquet write failure when data is nested two levels deep Parquet write failure (from record batches) when data is nested two levels deep May 24, 2022
@tustvold
Copy link
Contributor

This looks very similar to #1651 which fixed the read side, there is likely a similar issue on the write side. Thank you for the report, I'll take a look tomorrow

@ahmedriza
Copy link
Author

Cool @tustvold. I do recall the reader side error as well before version 14. Thanks a lot.

@alamb
Copy link
Contributor

alamb commented May 26, 2022

For anyone following along, there is a PR proposing to fix this: #1746

tustvold added a commit that referenced this issue May 27, 2022
* Support writing arbitrarily nested arrow arrays (#1744)

* More tests

* Port more tests

* More tests

* Review feedback

* Reduce test churn

* Port remaining tests

* Review feedback

* Fix clippy
@alamb alamb added the parquet Changes to the parquet crate label May 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants