Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Skyhook] Refactor Skyhook file format to use a higher level interface #40583

Open
drin opened this issue Mar 15, 2024 · 10 comments
Open

[C++][Skyhook] Refactor Skyhook file format to use a higher level interface #40583

drin opened this issue Mar 15, 2024 · 10 comments

Comments

@drin
Copy link
Contributor

drin commented Mar 15, 2024

Describe the enhancement requested

The enhancement is to refactor the Skyhook file format (a custom target of the Arrow dataset API) to loosen the coupling with the Arrow dataset API.

The request was originally made via the mailing list (link) and mentions that improvements to the dataset API are not possible while maintaining the Skyhook file format in its current form. A reference is also made to Acero and its usage of substrait, perhaps to contextualize a preferred approach.

Component(s)

C++

@bkietz
Copy link
Member

bkietz commented Mar 19, 2024

The basic features of skyhook are a configurable server side scan/compute, and efficient transport of resulting buffers to the client. This does not correspond to a file format; file formats compartmentalize reading data files independent of I/O. A CSV formatted file might reside in a string literal or an S3 bucket, but skyhook's files are actually IPC or parquet and must reside on a ceph server. Writing skyhook as a file format therefore breaks conventions and contracts relied on by the dataset API.

However in the context of acero skyhook has a very natural structure:

  • A source ExecNode on the client which requests specific scan/compute from the server, forwards returned batches into an ExecPlan, and mediates back pressure.
  • A ceph CLS which maintains a catalog of data files for scanning, instantiates server side ExecPlans to fulfil client requests, and responds to back pressure from the client.

Some design notes which follow from the above:

Currently skyhook supports only predicate and projection pushdown, but there's no reason to forbid specifying arbitrary computation to the server. For example an aggregation performed on the server could save significant network overhead when the result is small. Although Arrow's Expressions are serializable this feature isn't designed for stability. Substrait is a more capable and stable tool intended for transmitting arbitrary execution plans. It'd be worthwhile to replace projection/filter Expression serialization with a substrait subplan to be executed on the server. This should actually simplify skyhook's server side code since it can reuse arrow::engine::SerializePlan/DeserializePlan.

Furthermore, skyhook need not be bound to a single file format or to output small enough to be materialized as a single Table. On the server an arbitrary collection of data files can be wrapped into a catalog of datasets. For example, a collection of csv and parquet data files could be wrapped in a UnionDataset, cataloged as a single table nyc-taxi. The server side complexity of a dataset can be invisible to client configuring a scan; clients should only need to reference these datasets by name and schema, using a substrait NamedTable message.

Instead of having each skyhook scan negotiate a new connection, the connection to a ceph server should be factored out to an independent client side object. This way validation of the connection, checking for registration of a compatible CLS, and caching of the server's catalog can all be compartmentalized into a single construction.

The purpose of the arrow monorepo is cross platform library code, whereas skyhook is comprised of a specialized service and client pair. Skyhook should therefore probably be relocated outside the arrow monorepo. Moreover skyhook is compelling as a demonstration of both the extensibility and usability of acero from third party code, which will be emphasized by keeping it in an independent project.

Some example code illustrating what using a skyhook source node on the client could look like:

// This only needs to be done once:
ARROW_ASSIGN_OR_RAISE(auto skyhook_connection, MakeSkyhookConnection(
    config_path, data_pool, user_name, cluster_name));

// Display the catalog:
for (auto [name, schema] : skyhook_connection.catalog()) {
  std::cout << name << ": " << schema->ToString() << std::endl;
}

Result<std::shared_ptr<Table>> GetTripCostsForYear(int year) {
  // Declare a server side plan which will aggregate trip costs
  // grouped by tag in the specified year:
  auto server_side_plan = Declaration::Sequence({
      {"named_table", skyhook_connection.MakeNamedTableOptions("nyc-taxi")},
      {"filter", FilterNodeOptions(equal(field_ref("year"), literal(year)))},
      {"aggregate", AggregateNodeOptions({{"hash_sum", "cost", "cost_sum"}}, {"tag"})},
  });
  
  // The client side plan then uses the server side plan to configure its source node:
  std::shared_ptr<Table> output_table;
  auto client_side_plan = Declaration::Sequence({
      {"skyhook_source", SkyhookSourceNodeOptions(server_side_plan, skyhook_connection)},
      {"table_sink", TableSinkNodeOptions(&output_table)},
  });

  // Run the plan, return the resulting table
  ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make());
  ARROW_RETURN_NOT_OK(client_side_plan.AddToPlan(plan.get()));
  auto finished = plan->finished();
  ARROW_RETURN_NOT_OK(finished.status());
  return output_table;
}

A summary of the additions necessary to make that work, client side:

  • SkyhookSourceNode (subclass of ExecNode)
    • in StartProducing() override: serialize server_side_plan to substrait and send to server
    • receives ceph::bufferlists asynchronously, converts to ExecBatches, then pushes to the next node with this->output()->InputReceived()
    • forwards PauseProducing() and ResumeProducing() calls to the server
  • SkyhookSourceNodeOptions (subclass of ExecNodeOptions)
  • in client initialization: register SkyhookSourceNode factory named "skyhook_source"

... and on the server side:

  • Maintain a catalog mapping table names to datasets, accessible as a NamedTableProvider
  • Respond to new connections with a summary of the catalog, mapping names to schemas
  • Receive and excecute server_side_plans from clients
    • deserialize from substrait
    • construct and run ExecPlan
    • convert each generated ExecBatch to ceph::bufferlist and transmit to the client
    • pause and resume when indicated by the client

@drin
Copy link
Contributor Author

drin commented Mar 19, 2024

Wow, thanks so much for the write up: it is well written and gives me clear ideas of the direction you're proposing and where the drawbacks of the Skyhook file format are.

I want to clarify (or maybe pushback?) on a few things, but overall I see a path forward with minimal changes to your suggestions.

The basic features of skyhook are a configurable server side scan/compute, and efficient transport of resulting buffers to the client. This does not correspond to a file format; file formats compartmentalize reading data files independent of I/O.

I believe that the Skyhook file format (as implemented) is a contract that when files are written in a given format (arrow IPC) using a standard posix filesystem, they can be read using a different interface (arrow dataset). Because two separate I/O interfaces are used, the file format accommodates that in the contract. It is akin to saying that if you write files in a particular way, you can process the blocks of the file without changing how you write files.

Writing skyhook as a file format therefore breaks conventions and contracts relied on by the dataset API.

Yes, this part I understand. I would just add that this is because ceph (which skyhook is an extension for) has a broader definition of what a "file" is in order to map it to object storage. Any system that shims a filesystem interface over the actual storage interface is going to have a similar impedance mismatch (e.g. s3fs or anything like that).

All that being said, I agree that nearly all of the changes you've proposed are good; the only one I think we won't do is grouping of files (unless we find a way to shim a "special directory" as a grouping, but that sounds hack-y).

To rephrase your proposal, I believe you're suggesting we implement a custom operator (SkyhookSourceNode) to facilitate data flow between the client and server over the network. The input to the custom operator is executed by Skyhook and the results are handled by the client node.

I think the catalog can still be resolved on the client side and the custom operator can be used in many connections (every FileFragment is an independent access to a potentially distinct ceph server) and this would only require very simple ExecPlan transformations.

@drin
Copy link
Contributor Author

drin commented Mar 19, 2024

I also want to note that pushing any more operators below the SkyhookSourceNode would be done blindly without some type of optimizer making that decision. So, the new operator makes sense when inserted above Read operators and that would be the sensible first step.

@bkietz would you know how Acero serializes custom operators into substrait? Does it use a message that is only known internally? I have been working with custom substrait messages and SkyhookSourceNode is a perfect example of where it seems to make sense to use one (and I have something similar I can use here

@bkietz
Copy link
Member

bkietz commented Mar 19, 2024

I believe that the Skyhook file format (as implemented) is a contract that when files are written in a given format (arrow IPC) using a standard posix filesystem, they can be read using a different interface (arrow dataset).

The problem is that the contract of skyhook as written is not the contract of a file format. File formats are intended to be an orthogonal detail to I/O. It would be possible to write a subclass of arrow::Array which is mutable, but although that is possible the broken contract of immutability will severely restrict its usage in the arrow library.

To rephrase your proposal, I believe you're suggesting we implement a custom operator (SkyhookSourceNode) to facilitate data flow between the client and server over the network. The input to the custom operator is executed by Skyhook and the results are handled by the client node.

Yes, SkyhookSourceNode is a client side node which proxies the server side plan in the client side plan.

(every FileFragment is an independent access to a potentially distinct ceph server)

I would recommend instead having a 1:1 relationship between SkyhookSourceNodes and ceph servers. If multiple ceph servers are in play, a UnionNode can be used in the client side plan to concatenate their streams.

would you know how Acero serializes custom operators into substrait?

The design above does not require serializing custom nodes, since it only requires serialization of the server side plan. The SkyhookSourceNode will only appear in the client side plan.

I also want to note that pushing any more operators below the SkyhookSourceNode would be done blindly without some type of optimizer

Optimization and other restructuring of plans is not currently in scope for acero. Instead the exact plan specified is what will be executed. The roadmap hope is that acero's substrait support will continue to improve and that cross-engine plan optimizers will be written against substrait

@drin
Copy link
Contributor Author

drin commented Mar 19, 2024

The hope is that acero's substrait support will continue to improve and that cross-engine plan optimizers will be written against substrait

I am working on enabling cross-engine plan optimizers, so that's not a general issue, just a necessary mechanism to enable specifying arbitrary computation to skyhook.

I would recommend instead having a 1:1 relationship between SkyhookSourceNodes and ceph servers. If multiple ceph servers are in play, a UnionNode can be used in the client side plan to concatenate their streams

Edit: I realize I misread this a bit earlier. 1:1 between SkyhookSourceNodes and ceph servers is what I was mentioning, but that still requires many connections unless it's possible to reuse one pipe for many servers.

The design above does not require serializing custom nodes, since it only requires serialization of the server side plan

Then I may be misunderstanding. If SkyhookSourceNode is a client-only node, then it seems like you're proposing execution of 2 independent ExecPlans, and not the use of SkyhookSourceNode for facilitating network communication in the execution of a single ExecPlan, correct? In which case I suppose I can understand the recommendation of a UnionNode, but I am not sure it would even be needed if execution on the server side and client side are independent.

@drin
Copy link
Contributor Author

drin commented Mar 19, 2024

File formats are intended to be an orthogonal detail to I/O.

I won't belabor the point because I understand you to mean this for the Arrow library in particular. I was just mentioning that file format is not orthogonal if a storage system wants to improve performance. Parquet is an example where it is not orthogonal, the point of RowGroups is to allow portions of a file to be accessible independent of the larger file.

But this was meant to be a broad comment, not on how the dataset API should be designed; I totally agree that we want to remove the skyhook file format if it's hindering Arrow's development.

Actually, it occurs to me that skyhook file format should really have been more like a file reader and the difference from the parquet reader is that it can execute compute, etc. on each fragment. I guess another anecdote that the abstractions are not very clean cut.

@bkietz
Copy link
Member

bkietz commented Mar 20, 2024

If SkyhookSourceNode is a client-only node, then it seems like you're proposing execution of 2 independent ExecPlans

Precisely; a server side plan which reads files and performs pushed-down compute finally pushing batches to the client, and a client side plan which receives these batches and performs other computation on them. This is analogous to the current structure of Skyhook which uses arrow datasets on the server side to scan/filter/project/transmit a Table to the client, and on the client side to act as a data source.

For example: the server side plan might read a set of parquet files with a pushed down filter and perform aggregation while the client side plan includes a UnionNode and collects batches from the skyhook server and from a local dataset, collecting into a single Table:

# server side:
ScanNode(nyc-taxi/*.parquet)
  -> FilterNode(year==2016)
    -> AggregateNode(cost ON tag)
       -> SinkNode(push ceph::bufferlist to client)

# client side:
ScanNode(local-addenda.parquet) ---------------------------v
SkyhookSourceNode(receive ceph::bufferlist from server) -> UnionNode -> TableSinkNode

In this example, the client side plan includes a SkyhookSourceNode to facilitate network communication by forwarding batches from the server into the UnionNode. For another example, the client side plan could union streams from three different skyhook servers (since we have UnionNode, there's no need to complicate SkyhookSourceNode by forcing it to deal with multiple servers):

# client side:
SkyhookSourceNode(receive ceph::bufferlist from server A) ---v
SkyhookSourceNode(receive ceph::bufferlist from server B) -> UnionNode -> TableSinkNode
SkyhookSourceNode(receive ceph::bufferlist from server C) ---^

@bkietz
Copy link
Member

bkietz commented Mar 20, 2024

Actually, it occurs to me that skyhook file format should really have been more like a file reader and the difference from the parquet reader is that it can execute compute, etc. on each fragment. I guess another anecdote that the abstractions are not very clean cut.

This isn't explicitly documented, it's true. It might be useful to note that file readers and the file formats which wrap them can be pointed at a path on a file system or a specific buffer (FileSource) and are guaranteed to do no IO other than reading from that file system or that buffer.

@drin
Copy link
Contributor Author

drin commented Mar 20, 2024

one last thing I'm curious about, I assume that the sink node on the server side can't stream batches to the client? or is there a way to access ExecBatches from the sink node in a streaming manner?

@bkietz
Copy link
Member

bkietz commented Mar 20, 2024

The default sink node does allow access to the stream of batches as a AsyncGenerator<std::optional<ExecBatch>>. AsyncGenerator<T> is a sequence of Future<T>, and we have several utilities to consume and compose these. The simplest approach is probably to wrap the generator into a synchronous sequence with MakeGeneratorIterator and dedicate a thread to iterate through the synchronous sequence, pushing each batch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants