Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Think about how to aggregate and/or scatter chunks when copying #24

Closed
3 tasks
JackKelly opened this issue Jan 17, 2024 · 5 comments
Closed
3 tasks

Think about how to aggregate and/or scatter chunks when copying #24

JackKelly opened this issue Jan 17, 2024 · 5 comments
Assignees

Comments

@JackKelly
Copy link
Owner

JackKelly commented Jan 17, 2024

For example, for the ICON NWP data: each init time consists of 146,000 GRIB2 files! One file for each vertical level, variable, timestep.

Jacob has a script which converts ICON NWP GRIB2 to Zarr. This conversion takes 0.5 to 1 hour per init time, and the output compressed Zarr is about 50 GBytes per init time. Reading from a local SSD on one of Open Climate Fix's on-prem computers. Zarr is compressed using blosc2. No sub-selection.

(Did I get the details right, @jacobbieker?!)

How to merge multiple input files into a single output file in LSIO?

Maybe we should pass back to Python (async)? But that'll have a large performance impact.

Is there a way for users to tell LSIO to merge or split input files, in arbitrary, user-configurable ways?

TODO:

  • Specify the function signatures for read_then_write and read_map_write (first, don't worry about merging).
  • Think about how to specify merging chunks
  • Think about how to implement merging chunks
@JackKelly JackKelly self-assigned this Jan 17, 2024
@JackKelly JackKelly moved this to Todo in light-speed-io Jan 17, 2024
@jacobbieker
Copy link

Yes, that's right. The final ICON-EU size is actually 20ish GB, the ICON-Global is 50GB, processed examples are here: https://huggingface.co/datasets/openclimatefix/dwd-icon-eu/tree/main/data and the raw data is here: https://opendata.dwd.de/weather/nwp/icon-eu/grib/06/

@JackKelly
Copy link
Owner Author

JackKelly commented Jan 17, 2024

Thinking this through:

We could think about this in terms of "scatter" and "gather" operations. Or maybe that's too restrictive, as illustrated by the example below.

Copying chunks of many GRIB2 files into a single Zarr chunk would be a "gather" op. To make life complicated, let's assume the GRIB2 files are compressed. And we want to put half each GRIB2 files into ZarrA, and the other half into ZarrB. It could would go something like this:

  1. Pre-allocate memory buffers for ZarrA and ZarrB.
  2. Create an AtomicUInt for ZarrA, and another AtomicUInt for ZarrB. These count the number of outstanding loading operations until the buffers for ZarrA or ZarrB are complete.
  3. For each GRIB2 file:
    1. Load all the GRIB2 file
    2. Decompress
    3. Split in half
    4. Put each half into the appropriate location in the ZarrA and zarrB buffers
    5. Decrement the AtomicUInts
    6. If the atomicUInts get to zero then compress & write the Zarr buffers to disk?

TODO: Think about this more! This is a very early draft!

JackKelly added a commit that referenced this issue Jan 17, 2024
@JackKelly
Copy link
Owner Author

JackKelly commented Jan 17, 2024

We could think of this as:

  1. Read each individual source file
  2. Map each source file (decompress)
  3. Reduce (merge the first half of all GRIB files into ZarrA; and the second half into ZarrB)
  4. Map (compress each Zarr chunk)
  5. Write the compressed Zarrs to disk

So maybe there is a general-purpose data structure to specify this. Something like:

struct ReadMapReduceMapWrite { // Horrible name!
  source_file_chunks: Vec<(PathBuf, ByteRanges)>,

  // As soon as LSIO finishes reading source_file_chunk, LSIO
  // will start reading the source_file_chunks for the next ReadMapReduceMapWrite,
  // so LSIO can overlap reading from IO whilst running reduce_func and dst_ma_func.

  /// Applied to each byte range? Or applied to each (PathBuf, ByteRanges)?
  source_map_func: Fn([u8]) > [u8],

  /// Takes as input all the outputs of the source_map_func,
  /// along with their source location. And outputs 1 or more buffers, with their destination.
  reduce_func: Fn(ArrayQueue<([u8], PathBuf, ByteRange)>) -> Vec<([u8], PathBuf, ByteRange)>, 

  // Applied in parallel to each output of reduce_func
  // Data is written after reduce_func completes
  dst_map_func,
};

@JackKelly
Copy link
Owner Author

JackKelly commented Jan 17, 2024

Or, put more responsibility on the user's code.

The reader and writers would run in their own threads, separate from the main thread. We'd receive groups of decompressed GRIBs from a channel. And then we'd send uncompressed output buffers to another channel.

Something like this:

let source_receiver = read_and_map([gribs_for_zarrA_and_zarrB, gribs_for_zarrC_and_zarrD], decompress_func);
for decompressed_gribs in source_receiver {
    let outputs_for_task = merge_into_zarrs(decompressed_gribs);
    for output_buf in outputs_for_task {
        writer_sender.send(MapAndWrite{map: compression, buf: output_buf, path: "foo", byte_ranges: vec![...]})
    }
}

I think this second approach might be more flexible. And, unlike the ReadMapReduceMapWrite, it's clear how this "channels" approach would allow us to have the source storage subsystem be different to the dst storage (e.g. reading from local SSD and writing to a cloud storage bucket.). So maybe I'm learning towards this second approach. Even though it involves yet more threads!

@JackKelly
Copy link
Owner Author

The thought process which started with this issue has ended in me totally changing the API design! So this specific issue is now out-of-date. For my latest plans, see https://github.com/JackKelly/light-speed-io/milestone/1

@github-project-automation github-project-automation bot moved this from Todo to Done in light-speed-io Jan 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

No branches or pull requests

2 participants