Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose a static object store registry #1072

Closed
wants to merge 1 commit into from

Conversation

rdettai
Copy link
Contributor

@rdettai rdettai commented Oct 6, 2021

Which issue does this PR close?

This is linked to #1010 #1062

Rationale for this change

Currently, we don't have a good way to make the ObjectStoreRegistry available in various places of the code. Indeed, to setup its object stores, the registry needs to have access to the implementation code (which might be in an other crate) at compile time and needs to have access to the proper configurations (such as credentials) when initialized. In particular in Ballista, we need to have the right registry configuration on the Executors and the Scheduler.

Creating a singleton static object store registry is not ideal (it comes with with the usual curses that go with static), but solves (at least temporarily) some of the issues:

  • To add a new object store, we now have a single point of configuration, the OBJECT_STORES initialization, that will propagate the configuration to all the places where the registry is required (local, scheduler, executor)
  • OBJECT_STORES can be accessed in any place of the code: logical plan or physical plan deserialization, provider creation...

What changes are included in this PR?

  • removing the registry from the ExecutionContextState
  • exposing a static ObjectStoreRegistry called OBJECT_STORES

Are there any user-facing changes?

  • removing the registry from the ExecutionContextState changes the config API but that part was likely not used yet

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Oct 6, 2021
@rdettai
Copy link
Contributor Author

rdettai commented Oct 6, 2021

@yahoNanJing @alamb @houqp I created this PR as I noticed that this change was proposed as part of #1062 and I was also reaching a similar conclusion as part of rdettai#1. I think it is worth discussing it separately, as this solution comes with drawbacks.

@jorgecarleitao
Copy link
Member

Could you elaborate on the root cause for requiring this static?

I was unable to reason about

to setup its object stores, the registry needs to have access to the implementation code (which might be in an other crate) at compile time and needs to have access to the proper configurations (such as credentials) when initialized

@rdettai
Copy link
Contributor Author

rdettai commented Oct 6, 2021

Could you elaborate on the root cause for requiring this static?

The problem is mainly present in Ballista:

  • you need the ObjectStore in the ExecutionPlan and the TableProvider
  • both these need to be serialized and deserialized (either in the scheduler or the executor)
  • how do we ensure that the right object store are instantiated in the various components?
    • the object store implementation might be in a different crate, so we need to make sure that all component where compiled with the right dependencies
    • what does it even mean to serialize an object store?
  • referring to one common instance of the registry from all the different places helps this

I am not really happy about this solution either, so any alternative solution is welcome!

@alamb
Copy link
Contributor

alamb commented Oct 6, 2021

I am not really happy about this solution either, so any alternative solution is welcome!

I think the alternative would be to thread the ObjectStoreRegistry on some structure down to all the places it was needed (e.g. perhaps the ExecutionContext). This seems like it will require a substantial amount of plumbing.

Basically I don't see the ObjectStoreRegistry as something that is "owned" by the ExecutionPlan nodes, it is more like some execution specific configuration they happen to have a shared reference to for convenience

@houqp
Copy link
Member

houqp commented Oct 7, 2021

Interesting, from the discussion we had in rdettai#1, I got the opposite impression on where we are heading ;P I thought the conclusion from that discussion was:

  1. We require all nodes in the ballista cluster to run binaries with the same set of object store crate/plugin statically compiled in
  2. We don't need to do object store serialization if we require all nodes to have the same object store config synced on disk or in environment variables. If all nodes have the same object store config, then calling ObjectStoreFoo::new should result in the same object store being provisioned assuming the object store crate versions are compatible across nodes.

With this in mind, do we still need a global singleton registry? The objectstore registry is already attached to ExecutionContext today, so it should be good enough right?

@rdettai
Copy link
Contributor Author

rdettai commented Oct 7, 2021

I opened this PR to formalize the discussion, sorry if it brought in more confusion than clarity 😃. Let me try to summarize the full context.

  • The challenge comes from the fact that we would want to be able to plug in arbitrary object stores (possibly from external repositories as discussed in S3 Support #907)
  • For a local user of the Datafusion lib, this can be done by attaching the ObjectStoreRegistry to the ExecutionContext, and then registering any ObjectStore implementation to the context.
  • For a distributed user of Datafusion (like ballista), things are a bit more complicated:
    • (1) either we keep sourcing the ObjectStore instances from an ObjectStoreRegistry. The registry will need to be initialized coherently in each component of the application (in Ballista that means the executor and the scheduler), and the right plumbing needs to be added to have that registry instance readily available in each part where an ObjectStore instance is required. This is particularly annoying for the serde components which are not attached to the ExecutionContext in any way. A static (if possible immutable) instance of ObjectStoreRegistry helps solving these issues: coherent registry + simplified plumbing.
    • (2) or we define at the serde level what object stores are available. Just as with the TableProvider and ExecutionPlan traits, where the Ballista serialization only recognizes a given list of implementations (namely CSV and Parquet), the scheduler / executor will instantiate object stores directly without the ObjectStoreRegistry (the serde calls ObjectStoreFoo::new directly). This mean we won't allow to add new object stores through the context in Ballista.

If I understand correctly, @houqp is leaning toward solution (2). Even though it lacks flexibility, it uses the same mechanism Ballista is already using for table providers, so I would also go with it.

So with solution (2) the Ballista flow would be:

  • the object store is resolved from the ObjectStoreRegistry in the Datafusion context on the Ballista client
  • we try to serialize the logical plan
    • if the object store is not known by the serde: error
    • if it is known, the object store is serialized into a simple enum (we don't serialize further config such as credentials)
  • the deserialization calls ObjectStoreFoo::new directly, which might pick up configuration from its environment

@alamb
Copy link
Contributor

alamb commented Oct 7, 2021

I like this enumeration of possibilities (1) and (2). 👍

I think solution (2) is also a reasonable approach.

Another thought I had if we want to do (1) without a static ObjectStoreRegistry may be to deserialize an ObjectStore reference to a PlaceholderObjectStore and then add a subsequent resolve_object_store pass over the deserialied ExecutionPlans to replace all PlaceholderObjectStore instances with an instance of the correct type.

Not the cleanest solution, but it would avoid trying to plumb something into the serde serialization

@rdettai
Copy link
Contributor Author

rdettai commented Oct 8, 2021

Thank you all for your input! Closing this as I feel we have discarded the implementation with static.

@houqp
Copy link
Member

houqp commented Oct 9, 2021

Thanks @rdettai for the detailed write up! I think going static should be good enough to unblock our development in the short run. But I agree with you that this is just a short term workaround. To make object store truly pluggable, we need to serialize them into unique values, e.g. uri scheme,, stored in generic strings instead of enum in protobuf. This way, if a user compiles in a new object store using a custom crate, they can still get it to work without having to change the ballista protobuf file.

In fact, I think we need to do the same thing for table provider as well, hardcoding table providers in protobuf leads to the same restriction. For example, it's not possible to use delta-rs's custom table provider with ballista at the moment.

Given that the current logical plan deserialization code only takes serialized protobuf plan as input, I think we would have to go with the lazy two pass deserialization approach proposed by @alamb . Alternatively, we can change the deserialization call in the scheduler to pass in the execution context. This will make it a lot easier to implement more dynamic deserialization logic for both object stores and table providers. I don't see a strong reason why we want to avoid referencing execution context during logical plan deserilization?

@alamb
Copy link
Contributor

alamb commented Oct 10, 2021

In fact, I think we need to do the same thing for table provider as well, hardcoding table providers in protobuf leads to the same restriction.

this is a good point at makes sense to me

I don't see a strong reason why we want to avoid referencing execution context during logical plan deserilization?

Likewise, I don't see any reason to avoid referencing ExecutionContext during logical plan serialization. The only potential challenge I could see is if the same LogicalPlan could be run several times (with different ExecutionContexts) as might be done with with cached query plans in transaction processing systems.

@rdettai
Copy link
Contributor Author

rdettai commented Oct 11, 2021

I don't see a strong reason why we want to avoid referencing execution context during logical plan deserilization?

The context contains a lot of different configurations, part of which will will be copied into the logical plan, part of which won't. So it seems to me that it will be kind of hard to figure out which configurations need to be consistently set across the different execution contexts across the cluster (either through the boot time config or through serialization along the query), and which are only needed on the node where the plan is created. I guess that in that case, the context should be structured into multiple tiers:

  • local (ok if configured on client instance only)
    • user defined variables
  • to be serialized along the logical plan
    • ex: batch size
  • static (boot time, needs to be configured on all nodes separately)
    • ex: object_store_registry, optimizers

@houqp
Copy link
Member

houqp commented Oct 11, 2021

I think splitting execution context config into different tiers is a good idea to make it more maintainable. Then we can pass only the static tier config to the plan deserialization code to make it deterministics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants