Skip to content
This repository has been archived by the owner on Apr 20, 2021. It is now read-only.

[Design] High level API proposal for ballista #463

Closed
edrevo opened this issue Feb 2, 2021 · 6 comments
Closed

[Design] High level API proposal for ballista #463

edrevo opened this issue Feb 2, 2021 · 6 comments
Labels

Comments

@edrevo
Copy link
Collaborator

edrevo commented Feb 2, 2021

I've been thinking lately about what the best API is for the different parts of ballista. For now, I'm still tabling the discussion around resource managers (K8s, Mesos, YARN, etc.) and I'll focus on scheduler, executors and clients.

I see that right now the executors implement the flight protocol, which makes perfect sense for arrow data transmission. I think this is a great fit for the "data plane" in ballista: executor<->executor communication and executor <-> client communication (for .collect).

When it comes to the "control plane", though, I think stream-based mechanisms like the flight protocol (or even the bidirectional streams in gRPC) aren't great: they pin the communication of the client to a specific server (scheduler, in this case), so that makes it harder to dynamically increase the number of instances in a scheduler cluster (you could increase the instances, but all existing clients would still be talking the the same initial scheduler).

I also think that unary RPCs are easier to implement, and have a higher degree of self-documentation through the protobuf definition.

So, here's my proposal for the control plane:

image

The arrows go from client to server in this above image.

The scheduler would have an API that would look something like this:

service SchedulerGrpc {
  // This is the only call that executors need. It works similar to how Kafka's consumer/producers poll the brokers.
  // The executor needs to poll the scheduler through this method every X seconds to be part of the cluster.
  // As a reply, it gets information about the cluster state and configuration, along with a potential work item
  // that needs to be added to the executor's work queue.
  rpc PollWork (ExecutorState) returns (GetExecutorMetadataResult) {}


  // These methods are for scheduler-client interaction. These are the methods that would need to be called from
  // any client wrappers we want to provide, so they should be kept as simple as possible.

  rpc Execute (Action) returns (JobId) {}

  // JobStatus includes metadata about the job's progress along with the necessary Flight tickets to be able to
  // retrieve results through the data plane
  rpc GetJobStatus (JobId) returns (JobStatus) {}
}

I can drill down into the message definitions if you want, but I'm still not 100% sure of the complete amount of info that needs to go there (I know part of it).

The proposal for data plane is much simpler:

image

All of these would be based on the Flight Protocol, but we wouldn't use any of the control messages that flight offers (i.e. no DoAction).

Thoughts?

@edrevo
Copy link
Collaborator Author

edrevo commented Feb 2, 2021

This proposal would fix #457, by the way.

@jc4x4
Copy link
Contributor

jc4x4 commented Feb 11, 2021

Here is a post from AWS that explains why it is better for control plane (scheduler) to contact the data plane (executor).
https://aws.amazon.com/builders-library/avoiding-overload-in-distributed-systems-by-putting-the-smaller-service-in-control/

For safety mechanism, the executor would be tracking progress, e.g. how many bytes scanned, and having some watchdog mechanisms, e.g. CPU/memory too high. Then, when a scheduler contacts, it has more information and it can decide whether to let another executor take-over, or fail the job, or interrupt the current task and re-partition it to other executors.

Btw, I'm not very familiar with the distributed execution path yet. Is that merged into mainline? What would be a good reference point? Does the scheduler already know all workers' addresses that will participate in a query when it start the scheduling?

@edrevo
Copy link
Collaborator Author

edrevo commented Feb 12, 2021

That was an interesting read, thanks for the link! Distributed execution in Ballista 0.4 has just been merged at #491 There's still progress to be made there, so right now there's no fault tolerance for executor failures for example.

One difference between what is mentioned in the AWS article you linked and ballista is that control messages in ballista need to flow bidirectionally: the scheduler will send work to the executors, but it is equally important for the executors to be able to tell the scheduler that they have finished their job and whether they can accept more workload. That means that the API call rate is never going to be 100% controlled by the scheduler.

I do like the strategy of opening a long-lived connection from the executor and using that as a proxy for liveness. There's some increased complexity, since the scheduler will have to make a conscious effort to decide how many connections it wants. It also makes auto-scaling the scheduler cluster more difficult, since adding a new instance won't immediately remove any load from the existing schedulers.

The design I was proposing mimicks what Kafka does, which is reportedly able to handle 10k connections per broker. If I find some time, I might run some benchmarks on the current code to see what's the scalability of a scheduler right now.

@Shamazo
Copy link
Contributor

Shamazo commented Feb 20, 2021

@edrevo Have you thought about how the scheduler cluster might maintain shared state? I am thinking about things like partition statistics and possibly the provenance (and location) of partitions, i.e the chain of operations that produced a partition which could be useful in saving work for future queries.

I am thinking schedulers could be in a raft (or your favourite consensus algorithm) cluster.
edit: etcd could also fulfill this functionality

@edrevo
Copy link
Collaborator Author

edrevo commented Feb 21, 2021

Right now the scheduler cluster maintains state either through "standalone mode", which doesn't support cluster mode (so no HA), or through etcd.

Once #574 is merged, the shared state will contain information about each job, stage and task/partition that has been submitted to the cluster, including which executor is handling each partition and where the results can be fetched from.

Etcd is using the raft protocol behind the scenes, but using the raft protocol directly on the scheduler and avoiding the external dependency on etcd sounds great too. The way of implementing this would be to create a new ConfigBackendClient which does the necessary service discovery to find the other scheduler nodes and uses the raft protocol directly. Maybe that could be done by extending the Standalone config backend instead of implementing a new one.

@Shamazo
Copy link
Contributor

Shamazo commented Feb 21, 2021

Thank you, that helps clarify my mental model of the design. I am curious about the performance implications of running our own raft cluster vs using etcd. Using raft in the standalone mode would certainly add a lot more complexity.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

4 participants