Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversion from one dataset to another that will not fit in memory? #12653

Open
eitsupi opened this issue Mar 17, 2022 · 6 comments
Open

Conversion from one dataset to another that will not fit in memory? #12653

eitsupi opened this issue Mar 17, 2022 · 6 comments

Comments

@eitsupi
Copy link
Contributor

eitsupi commented Mar 17, 2022

Having found the following description in the documentation, I tried the operation of scanning a dataset larger than memory and writing it to another dataset.

https://arrow.apache.org/docs/python/dataset.html#writing-large-amounts-of-data

The above examples wrote data from a table. If you are writing a large amount of data you may not be able to load everything into a single in-memory table. Fortunately, the write_dataset() method also accepts an iterable of record batches. This makes it really simple, for example, to repartition a large dataset without loading the entire dataset into memory:

import pyarrow.dataset as ds

input_dataset = ds.dataset("input")
ds.write_dataset(inpute_dataset.scanner(), "output", format="parquet")
arrow::open_dataset("input") |>
  arrow::write_dataset("output")

But both Python and R on Windows crashed due to lack of memory. Am I missing something?
Is there a recommended way to convert one dataset to another without running out of computer memory?

@wjones127
Copy link
Member

Hi @eitsupi ,

Depending on your memory restrictions, you may need to control the batch size (how many rows are loaded at once) on the scanner:

import pyarrow.dataset as ds

input_dataset = ds.dataset("input")
scanner = inpute_dataset.scanner(batch_size=100_000) # default is 1_000_000
ds.write_dataset(scanner.to_reader(), "output", format="parquet")

Does that help in your use case?

@westonpace
Copy link
Member

At the moment we generally use too much memory when scanning parquet. This is because the scanner's readahead is unfortunately based on the row group size and not the batch size. Using smaller row groups in your source files will help. #12228 changes the readahead to be based on the batch size but it's been on my back burner for a bit. I'm still optimistic I will get to it for the 8.0.0 release.

@eitsupi
Copy link
Contributor Author

eitsupi commented Mar 18, 2022

Thank you both.
I tried lowering the batch size to 1000 in Python, but it still consumed over 3GB of memory and crashed.

I will wait for the 8.0.0 release to try this again.

@eitsupi
Copy link
Contributor Author

eitsupi commented May 10, 2022

I tried pyarrow 8.0.0 and unfortunately it still crashes.

@willbowditch
Copy link

Finding the same thing in pyarrow 8.0.0 converting from a CSV to Parquet - I've tried various batch sizes on the scanner and various min/max rows/groups on the writer.

Running in a container the memory usage increases to maximum and eventually crashes.

from pathlib import Path

import pyarrow.csv as csv
import pyarrow.dataset as ds

tsv_directory_path = Path("/dir/with/tsv")

read_schema = pa.schema([...])


input_tsv_dataset = ds.dataset(
    tsv_directory_path,
    read_schema,
    format=ds.CsvFileFormat(
        parse_options=csv.ParseOptions(delimiter="\t", quote_char=False)
    ),
)


scanner = input_tsv_dataset.scanner(batch_size=100)

ds.write_dataset(
    scanner,
    "output_directory.parquet",
    format="parquet",
    max_rows_per_file=10000,
    max_rows_per_group=10000,
    min_rows_per_group=10,
)

Using the csv.open_csv and pq.ParquetWriter to write batches to a single file works fine, but results in a single large file.

@VHellendoorn
Copy link

I am noticing the same issue with pyarrow 8.0.0. Memory usage steadily increases to over 10GB while reading batches from a 15GB Parquet file, even with batch size 1. The rows vary a fair bit in size in this dataset, but not enough to require that much RAM.

For what it's worth, I've found that passing use_threads=False as an argument to scanner prevents the memory footprint from growing as large (not growing past ~3GB in this case, but still fluctuating by a fair bit), after noticing that this implicitly disables both batch and fragment readahead here. The performance penalty isn't particularly large, especially with bigger batch sizes, so this may be a temporary solution for those wishing to keep memory usage low.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants