Skip to content

Conversation

@krynju
Copy link
Member

@krynju krynju commented Jul 16, 2021

TODO:

  • Tables.jl compatible constructors
  • Constructor for prepartitioned data (proto for now, needs the planned storage api for full implementation)
  • fetch(d::DTable) into any Tables.jl data structure (currently back to DataFrame)
  • Implement the Tables.jl interface for the DTable (+ figure out what makes sense to implement and what doesn't)
  • add more TODO items

Copy link
Member

@jpsamaroo jpsamaroo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work so far!

@jpsamaroo
Copy link
Member

@krynju when you get the chance, let's try benchmarking this against raw DataFrames and see how we do. I suspect we'll probably want some scheduler changes to optimize concurrent thunk submission (which currently takes a global scheduler lock).

@ExpandingMan
Copy link
Contributor

ExpandingMan commented Jul 17, 2021

Hey guys, glad to see this exciting work, thanks for all the effort!

I feel a little awkward making such a radical suggestion as merely an interested observer, but are you sure it would not be better to split this into a new package dependent on Dagger, rather than relying on @requires? This approach has worked out quite well in the JuliaData ecosystem where many packages have delightfully minimal dependencies: for example several extensions to DataFrames.jl exist outside that package such as DataFramesMeta.jl, IO packages such as CSV.jl which despite being used with DataFrames.jl in the majority of use cases only rely on the Tables.jl interface. There are even alternative tables packages such as TypedTables.jl which implement the Tables interface and are completely independent in terms of dependencies. If I make any significant contribution to the distributed tables stuff myself, it will likely be in expanding the functionality of Parquet.jl to be compatible with your work here, and it's not yet clear to me whether this will involve dependencies on DTables at all, perhaps it will only need Dagger and perhaps it will only need Tables.

I realize that currently this isn't a lot of code, so splitting it may seem premature, but it could easily grow over time to be an enormous code base, so splitting it can make dependency logic much more straightforward, and it is often easier to split projects in their nascent stages rather than having a massive reorganization later on.

Another thing that separate packages can make much easier to manage is exports: it may be inappropriate to export some table-specific functions from Dagger (because they may use common symbols), but may make more sense from a tables-specific package.

Lastly, I'm not entirely sure of the status of @requires with other compilation schemes such as sys-image compilation from PackageCompiler.jl, but it had caused me problems in the past, so this might be worth considering when deciding how strongly to rely on it.

@jpsamaroo
Copy link
Member

I'm fine with moving this functionality out once it's ready to go; in the meantime we can keep it here for easy access to CI, and for testing in case we need/want to make scheduler changes as part of this work.

@xiaodaigh
Copy link

Yeah. I think Dagger can exist without a table so I think the table functionality should be moved out as well.

Now for practicality reason, perhaps it's ok to keep it here for CI reason etc.

@krynju krynju changed the base branch from master to jps/eager-mem-leak-fix July 18, 2021 11:04
@krynju krynju changed the base branch from jps/eager-mem-leak-fix to master July 18, 2021 11:04
@vchuravy
Copy link
Member

On alternative is to make it a subpackage, slightly annoying CI setup but makes joined development easier without bringing in unecessary dependencies or using Requires.

@krynju krynju changed the base branch from master to jps/eager-mem-leak-fix July 19, 2021 19:11
@krynju krynju changed the base branch from jps/eager-mem-leak-fix to master July 19, 2021 19:11
@krynju krynju force-pushed the kr/dtable-impl branch 2 times, most recently from eb8e56c to 49285b3 Compare July 19, 2021 20:39
@krynju
Copy link
Member Author

krynju commented Jul 20, 2021

@jpsamaroo I've been thinking about managing the chunks that contain the partitions in a scenario where the table would be bigger than the available memory.
So at this stage this implementation doesn't care and loads all the partitions into memory, so naturally the process will die when it goes above the limit (at least when using threads, didn't check with processes)

So my idea for now is to first adjust the chunksize to memory size instead of rowcount to enable the rest of the functionality.
Then there's two scenarios:

  1. Tables loaded using the Tables.jl interface - the table will need to be partitioned within the constructor according to the chunksize. There will have to be a mechanism of caching some chunks into disk cache and then loading them back on demand.
  2. Tables that were initialized with a set of files - these will initially be partitioned according to the files provided and loaded into memory on demand. Then in case of any repartitioning (grouping, sorting etc.) the table will have to be treated as the table from point 1, so it will be partitioned and cached to the disk normally.

So both of these assume we would have some form of visibility on the memory available for a Processor to figure out when to cache the chunks and when not to load them into memory. I would probably declare/infer that information on the OSProc level, so even in a Distributed environment that memory availability could be handled. A different problem is where to cache the data - local env is easy, but in Distributed we should probably cache the table chunks at some commonly available location.

I'm wondering how much of this should be a part of the Dagger internals/scheduler and what should be a part of the DTable. I could imagine having a CachableChunk type for which you could define cache/restore functions and the scheduler figuring out when to call the cache and restore on these in order to make space in the physical memory and prepare the chunks for the Tasks to execute. With something like that the DTable wouldn't have a lot of work to do to provide out of core processing.

What's your take on this?

@jpsamaroo
Copy link
Member

I've thought about swap-to-disk a lot recently; some initial thoughts are here: #214 (comment), which sound quite similar to what you're suggesting.

I'm wondering how much of this should be a part of the Dagger internals/scheduler and what should be a part of the DTable.

This is definitely a thing that the scheduler should know about and handle automatically. It's also relevant to the DArray, and generally to other kinds of operations that generate lots of data.

I could imagine having a CachableChunk type for which you could define cache/restore functions and the scheduler figuring out when to call the cache and restore on these in order to make space in the physical memory and prepare the chunks for the Tasks to execute.

We already have the cache/restore in the form of checkpointing available to thunks (#183), although this is more of an automatic operation, whereas checkpointing is more manual (although that can change). Regardless, I don't think we need a new chunk type; the existing Chunk has a handle field that can be a MemPool.FileRef, or something else of our construction.

With something like that the DTable wouldn't have a lot of work to do to provide out of core processing.

It's my hope that we can handle all of these details in the scheduler, so that any Dagger computation can benefit from swap-to-disk. I don't think exposing it to the DTable would be necessary; any knobs could be controlled via multiple dispatch on the chunked data type.

@krynju
Copy link
Member Author

krynju commented Jul 21, 2021

So essentially there's three separate topics:

  1. Adding the scheduler memory awareness and management
  2. Chunk with a handle type specialized for loading parts of the table stored initially as a set of files reachable by some Processors. That would come with a user defined loader, so they can use any input format as long as at the end it produces a Tables.jl compatible source ingestible by the DataFrame.
  3. Chunk with a handle type specialized for storing table partitions in memory with specialized cache/restore functionality, so that the Scheduler can manage these according to the memory limits on used Processors (other chunks would be just serialized/deserialized for memory management purposes, here we could optimize it for the partitions specifically).

So 2 and 3 is separate, because in my mind a chunk from point 2 becomes a chunk from point 3 after being successfully loaded once (and in case the partition gets dropped accidentaly it can be loaded again from that source).

@jpsamaroo
Copy link
Member

Chunk with a handle type specialized for loading parts of the table stored initially as a set of files reachable by some Processors. That would come with a user defined loader, so they can use any input format as long as at the end it produces a Tables.jl compatible source ingestible by the DataFrame.

Chunk with a handle type specialized for storing table partitions in memory with specialized cache/restore functionality, so that the Scheduler can manage these according to the memory limits on used Processors (other chunks would be just serialized/deserialized for memory management purposes, here we could optimize it for the partitions specifically).

I think we need that handle type (which can just be MemPool.FileRef potentially), and potentially also some way of expanding the Processor abstraction to reflect persistent storage. So the delineation would be:

  • Chunk with handle of type MemPool.DRef (what's currently used to represent in-memory data), and processor that the data is associated with.
  • Chunk with handle of type MemPool.FileRef, and "processor" representing the storage format of the data.

Here, the DRef and FileRef implement the means to retrieve stored data; doing MemPool.poolget(::DRef) returns the data stored in memory, while MemPool.poolget(::FileRef) should ideally return a representation of the data as stored on the file system (probably an open file descriptor on the host system). (Note that MemPool.poolget is called by Dagger.move by default). In the case of FileRef, it should not actually read the data into memory: that should be the job of Dagger.move, which will convert and serialize the data as needed. For a FileRef, doing a Dagger.move from some "storage processor" to a ThreadProc would imply reading the data into memory on that thread (really, just into the associated Julia process) in the most efficient manner possible. The combination of source processor, destination processor, and data type type should direct everything necessary to move data from file to memory, and back.

So what's missing here, in my mind, as an extra abstract type Storage, subtypes of which can be used as the processor field of Chunk, and represents that instead of having the data be associated with a computational device, it's associated with a storage device. It wouldn't be legal to execute code against a Storage type, so there'd be an extra step required to make a new Chunk with a real Processor (in other words, move it into memory).

Adding the scheduler memory awareness and management

Given the above, the scheduler could be taught to move data from disk to memory when the data is necessary for a computation. It can also be taught that, when bringing in a new piece of data into memory in some way, if the memory requirement estimated for that data is greater than the memory limit of the processor (CPU RAM, GPU VRAM, etc.), then other chunks that're in-memory right now will need to first be moved to disk and cleared from memory.

An inability to perform this operation (say, because disk space ran out) would result in the scheduler setting an error state on that task of the DAG and anything that depends on it. In an interactive setting, the user could clean up more disk space or memory, and then re-run the computation and the scheduler should then retry from where it left off.

So what will be the format on disk for some arbitrary blob of data currently in-memory? This should probably be determined by a hypothetical Dagger.default_storage_type(X), which would return a Storage subtype that can feasibly store X. Dagger would try to find a matching Storage instance, and if it succeeds, the scheduler could use that concrete instance to initiate a transfer into that storage location. The result of that transfer will be a MemPool.FileRef, which can later be used by the same Storage instance to retrieve the data from disk.

So how does all this work for the DTable? The DTable will need to auto-detect (based on the input table type or other mechanism) what Storage subtype is appropriate to represent the data is it is passed in, and then query Dagger to retrieve a matching storage instance. That can be used to construct any chunks associated with the input table, and those chunks can be safely passed as thunk arguments, and the scheduler will figure things out. Of course, the scheduler will then be free to temporarily send that data to disk in whatever format it wants (probably not CSV I would hope).

Thoughts?

@krynju
Copy link
Member Author

krynju commented Jul 22, 2021

Makes sense to me with my current understanding of the internals

Although I have 3 questions:

  1. Should the DTable storage stay as it is? Currently it's a Vector{Dagger.EagerThunk} and the thunk inside after creation is a simple Dagger.@spawn (()->df)(). If there's some operation applied the thunk will represent that operation instead. If I were to replace the storage with a Vector{Chunk} then I would have to unwrap the thunks with some table operations to retrieve the chunk and place it there, but I suppose that will break the execution graph if another operation were to be done on that retrieved chunk.
  2. On creation (keeping in mind the new interface) I would have to explicitly create a chunk with .processor attribute set to the Storage instance the partition would be loaded from. What then? Should I wrap it in a simple thunk as well and keep it like that in the DTable vector? eg. Dagger.@spawn collect(Dagger.tochunk(<partition>))
  3. If we ever wanted to give special treatment to the partitions caching/restore mechanism (maybe compression) how to specialize that? Implementing a Dagger.move operation for these chunks? How to figure out whether it is that partition chunk? New handle type?

@jpsamaroo
Copy link
Member

1. Should the DTable storage stay as it is? Currently it's a `Vector{Dagger.EagerThunk}` and the thunk inside after creation is a simple `Dagger.@spawn (()->df)()`. If there's some operation applied the thunk will represent that operation instead. If I were to replace the storage with a `Vector{Chunk}` then I would have to unwrap the thunks with some table operations to retrieve the chunk and place it there, but I suppose that will break the execution graph if another operation were to be done on that retrieved chunk.

I would make it a Vector{Union{Dagger.EagerThunk, Dagger.Chunk}}. This way, if you already have the data in-hand you can place its Chunk straight in there, but if it's an operation on other data then the EagerThunk goes there. Also, Chunk data should be immutable, so operations should not be modifying data stored in a Chunk in-place. We would need #186 before allowing any form of mutation.

2. On creation (keeping in mind the new interface) I would have to explicitly create a chunk with .processor attribute set to the Storage instance the partition would be loaded from. What then? Should I wrap it in a simple thunk as well and keep it like that in the DTable vector? eg. `Dagger.@spawn collect(Dagger.tochunk(<partition>))`

It would be placed straight into the DTable as-is.

3. If we ever wanted to give special treatment to the partitions caching/restore mechanism (maybe compression) how to specialize that? Implementing a `Dagger.move` operation for these chunks? How to figure out whether it is that partition chunk? New handle type?

At-rest compression of data would be best handled by a dedicated Storage type, I would think. The storage object will be passed to Dagger.move by default, so you can implement custom serialization behavior there. This is also making me think that we might want to model multiple "levels" of storage, where data can transition into and out of deeper and deeper levels of cold storage (uncompressed memory -> compressed memory -> compressed on SSD -> compressed on HDD -> compressed on tape 😛 ). Without that, the scheduler would just randomly place data in either compressed memory or compressed disk, which isn't great behavior. With the right handling of space available at each level, the scheduler could even skip some steps and go right to a very large, slow storage when memory and disk space are tight.

@shashi
Copy link
Collaborator

shashi commented Jul 26, 2021

Is there any plans to support indexing? How would groupby and join work efficiently? I spoke to @quinnj last year about possibly adding partitions and indexing helpers to Tables.. might to be a good time to restart that discussion.

@krynju
Copy link
Member Author

krynju commented Jul 26, 2021

Is there any plans to support indexing? How would groupby and join work efficiently?

Indexing - yes, I think it will be of great use for optimizing some operations.

Groupby and join I really have to brainstorm it later when I'm done testing the grounds with the "simple ops"
On one hand the base functionality of repartitioning (shuffle) and sorting is definitely a good thing to have. With that in place implementing some simple groupby and join algorithms would be possible.
On the other hand I don't want this project to be a spark rewrite - more like a Dask competitor and improved batch file processing with extra table abstraction.

In the context of these operations I'm mostly worried about memory management. Once this shuffe/sort or any transformation happens we'll be duplicating the table in memory. With no stable execution plan it's probably harder to decide when to free the previous chunks - especially in an interactive session

@jpsamaroo
Copy link
Member

We might want to consider what's available with Tables and TableOperations now: https://youtu.be/Ryp0F0dHnLA

@krynju krynju changed the title [wip] DTable implementation DTable implementation Jul 28, 2021
@krynju krynju force-pushed the kr/dtable-impl branch 2 times, most recently from 6af7636 to a18aa37 Compare July 28, 2021 22:47
@krynju
Copy link
Member Author

krynju commented Jul 28, 2021

I've seen that part of the interface and it will be very useful here both as a consumer of it and a source.

I've just added some small docs, fixed some consistency issues and put the DataFrames dependency in @requires. I think I can consider this initial scope ready as it is.

Next todo's I think can be summarized by this list:

  • Tables.jl (being a source) - figure out what parts of the interface and how to implement them in a way that makes sense.
  • Tables.jl (consuming the interface) - add a constructor consuming the partitions interface
  • Prepare a strategy on how to tackle repartitioning related operations: shuffle, sort, groupby, join; and assess their memory consumption.
  • select and transform
  • Implement the Storage API into the DTable when it's ready
  • Figure out indexing
  • Figure out potential schema usage/discovery and whether it can be any useful or just keep it unknown
  • Add some convenience functions to inspect the contents of the DTable such as getindex, getcolumn, first, last, describe, schema information, length, partition information, memory placement across Processors, cached percentage etc.
  • Consider some form of DResult structure for returning results of operations first applied per chunk, but not yet aggregated. Useful in map if you don't want to immediately get a full vcatted vector on the master processor (not so much in reduce).

@krynju krynju marked this pull request as ready for review July 28, 2021 22:48
@krynju krynju requested a review from jpsamaroo July 28, 2021 22:53
@quinnj
Copy link
Contributor

quinnj commented Aug 1, 2021

This is exciting! Feel free to ping me on Tables.jl review stuff; happy to help. Or even to brainstorm the implementation, happy to chat on slack or discourse.

Copy link
Member

@jpsamaroo jpsamaroo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, although we need to be careful about using closures, since they can add unacceptable compile time costs.

@krynju
Copy link
Member Author

krynju commented Aug 2, 2021

@jpsamaroo applied requested changes
CI failing due to some unrelated issue on master

Copy link
Member

@jpsamaroo jpsamaroo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Just a few nitpicks and comments, and then I'd be happy to merge this! 😄

@jpsamaroo jpsamaroo merged commit e6b3cdc into JuliaParallel:master Aug 5, 2021
@jpsamaroo
Copy link
Member

Thanks a ton! I'm looking forward to seeing this develop 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants