Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try interleaving compute with IO #37

Closed
1 task
JackKelly opened this issue Jan 29, 2024 · 6 comments
Closed
1 task

Try interleaving compute with IO #37

JackKelly opened this issue Jan 29, 2024 · 6 comments
Assignees
Labels
enhancement New feature or request

Comments

@JackKelly
Copy link
Owner

JackKelly commented Jan 29, 2024

Maybe it's as simple as something like this (adapted from https://ryhl.io/blog/async-what-is-blocking/#the-rayon-crate):

let future_processed_data = files
    .map(async move |file| {
        tokio::spawn(move async || {
            let data = store.get(file).await;
        
            // We now have the data! So let's submit to Rayon's threadpool:
            let (send, recv) = tokio::sync::oneshot::channel();
            rayon::spawn(move || {
                let result = process(data);
                let _ = send.send(result);
            });
            recv.await
        })
    .collect();
// At this point, the loading and processing should be running interleaved.

// Now let's wait for all the IO and processing to complete:
let processed_data = future_processed_data.map(|fut| fut.await.unwrap() );

UPDATE 2024-03-13: I'm thinking of dropping async/await. See #93. I also need to think about how to keep using object_store for cloud storage (e.g. have an adaptor which converts object_store reads into a channel of data).

UPDATE 2024-03-15: In issue #104, I'm sketching out how user's code would work if/when we drop async.

todo

@maximedion2
Copy link

Hi there! I know you created tons of tasks for this project, which is still quite experimental, and I see you haven't updated this one in over a month, so apologies if it's not something you're actively working on right now. But I'm curious about the high level implications here, specifically about reading Zarr chunk files asynchronously while having to deal with decompression (and a few other Zarr specific data manipulation steps), which is very much "blocking".

Have you put a lot of thought into how you want io_uring to work in the context of compressed files (I assume you did, as Zarr relies on it a lot, by default at least)? I'm asking on this sub task because it seems that's what you had in mind with the interleaving above, is that something you've decided against in the last month or so, or do you think it's still a promising approach (in your above example, I assume store would be something that uses io_uring under the hood)?

For context, here's me rambling about this on an issue for the project I'm working on datafusion-contrib/arrow-zarr#2 (comment), where I'm reaching the point where this potentially matters (I have an implementation that I'm not quite happy with right now, I'm trying to determine what I want to do short and longer term). I understand that this question in general isn't necessarily directly related to the io_uring effort, it's just tangential, and I greatly appreciate any thoughts and feedback!

@JackKelly
Copy link
Owner Author

JackKelly commented Mar 13, 2024

Hi!

My intention is definitely to allow for IO and compute to run in parallel 🙂. For example, say you have 16 logical CPU cores, I'd like the code to decompress 16 chunks in parallel (across those 16 CPU cores) whilst the IO subsystem loads more chunks from disk.

In fact, whilst the majority of my code so far has been on low-level IO (using io_uring), my expectation is that, in a year or two, the majority of the code in this repo (or maybe a new repo) will be for scheduling & running compute. I'd like the code to automatically optimise a plan for computation on small chunks of data. The plan should minimise the number of round-trips to RAM, and hence maximise the use of the CPU cache. My focus will especially be on compute for labelled multi-dimensional arrays (of the type that xarray handles today).

For example, a common use-case for us at Open Climate Fix is that we'll want to train an ML model on multiple Zarr datasets: e.g. a satellite dataset and a numerical weather prediction dataset. During ML training, we need to load aligned, random crops from these datasets (e.g. one "random crop" might be 128 km x 128 km centered over London, for all 24 hours of the day of the 1st January 2022. And we need to load that same "crop" from the satellite dataset and the NWP dataset). The satellite data might need to be re-projected on the fly. And both the NWP dataset and satellite dataset might need to be normalised (so each dataset has a mean of zero and a standard deviation of one). My hope is to build a system that can automatically plan how best to schedule and run the IO and computation (on a single machine), such that each chunk only needs to be loaded into CPU cache once, and then all the computation steps will be carried out on that chunks whilst it's in CPU cache (decompression, reprojection, normalisation, etc., and finally moving the fully processed data into a merged numpy array).

For my crate, I'm actually considering dropping async/await for the io_uring part, and instead using a Channel to send tasks to a work-stealing threadpool (see #93). I'll see how that affects performance and code complexity this week.

I'll hopefully start working on implementing decompression in the next few weeks.

If you do want to mix tokio (for async IO) and rayon (for concurrent compute) then Alice Ryhl shows a good way of implementing that in her blog. Her approach is to spawn a compute task to rayon's threadpool, and use a tokio::sync::oneshot::channel to send the result back to tokio. The end result is that the compute task won't block a thread in tokio's threadpool.

@martindurant
Copy link

For my crate, I'm actually considering dropping async/await for the io_uring part, and instead using a Channel to send tasks to a work-stealing threadpool (see #93). I'll see how that affects performance and code complexity this week.

This is exactly where I was going; anything on the python side can have a pollable but not awaitable future-like object (more like the concurrent.futures implementation, similar to dask) and no python-side asyncio.

want to mix tokio (for async IO) and rayon (for concurrent compute)

IIR, spawning (CPU-bound) worker threads in tokio async isn't hard, and you don't necessarily need a complex pool to manage it. Tokio has the awesome feature that the async event loop can be claimed by any thread that needs it; but likely a dedicated IO thread is still best, and worker threads accepting pipelined jobs from channels/queues.

@maximedion2
Copy link

maximedion2 commented Mar 17, 2024

My intention is definitely to allow for IO and compute to run in parallel 🙂. For example, say you have 16 logical CPU cores, I'd like the code to decompress 16 chunks in parallel (across those 16 CPU cores) whilst the IO subsystem loads more chunks from disk.

Right, one way or the other, that overall behavior has to happen, if not it defeats the purpose of doing work asynchronously. But I guess there's 2 ways this could happen, at a high level, you could return the raw data and let the user send tasks to a thread pool, or you could combine processing tasks and IO under the hood, so users would provide where they want to read from and the tasks to the entry function, and you're leaning towards the latter? But even then, ultimately users should still be able to efficiently post-process whatever they get from this crate, if they need to?

For instance, in your example, normalizing the data can't be done with a single CPU cache load per chunk, since the normalization factors depend on the whole data set? That could be handled under the hood via a call to a special function that still minimizes CPU cache loads, but could also be achieved via one "normal" call that reads and decompresses the chunks, and then the user could normalize the data however they want after it's handed to them?

In any case, yes I remembered seeing the above blog and your suggestion for tokio + rayon for IO/compute of chunks in one of the tasks you created, so when I inevitably ran into that situation I thought I would give that a try. Let's see how well that works!

@JackKelly
Copy link
Owner Author

at a high level, you could return the raw data and let the user send tasks to a thread pool, or you could combine processing tasks and IO under the hood, so users would provide where they want to read from and the tasks to the entry function, and you're leaning towards the latter?

My plan is to write multiple, interconnected crates. Each crate will be fairly small, and each crate will operate at a single level of abstraction. These crates will be hosted in this git repo, in a flat crate structure.

I haven't completely figured out exactly how to divide the work between these crates. But the basic idea is that there will be a crate which just performs IO for local storage using io_uring (no compute). Another crate will perform parallel computation on these chunks. Another crate will provide a Zarr front-end. etc.

I'll sketch out my current plans for these crates in issue #94.

So you'll be able to pick-and-choose 🙂

@JackKelly
Copy link
Owner Author

I'm going to close this issue for now, because the current plan is quite different from the plan outlined at the top of this issue. The new plan is to split the IO and compute into separate crates. And to use a crossbeam::channel to send chunks from IO to a compute threadpool. See https://github.com/JackKelly/light-speed-io/blob/new-design-March-2024/planned_design.md

I'll mark this issue as "closed: not planned" BUT I do very much plan to enable parallel computation on chunks! It's just that I'll use a rather different design to the design outlined above!

@JackKelly JackKelly closed this as not planned Won't fix, can't repro, duplicate, stale Apr 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Done
Development

No branches or pull requests

3 participants