Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task assignment between Scheduler and Executors #1221

Closed
mingmwang opened this issue Nov 2, 2021 · 19 comments · Fixed by #1560
Closed

Task assignment between Scheduler and Executors #1221

mingmwang opened this issue Nov 2, 2021 · 19 comments · Fixed by #1560
Labels
ballista enhancement New feature or request

Comments

@mingmwang
Copy link
Contributor

mingmwang commented Nov 2, 2021

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

When I read the code, I see the task assignment between executors and the Scheduler was the schedulers consistently poll works from the Scheduler. And if there is no task to run, the poll loop will sleep for 100 ms. I think a better way should be let the Scheduler assign the available tasks to selected executors to make better use of CPU cores. The existing loop can keep there for heartbeat purpose. Need a new RPC method between the executor and Scheduler for task assignment.

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

@mingmwang mingmwang added the enhancement New feature or request label Nov 2, 2021
@alamb
Copy link
Contributor

alamb commented Nov 2, 2021

Maybe related to #700

cc @Dandandan

@alamb alamb added the ballista label Nov 2, 2021
@Dandandan
Copy link
Contributor

Maybe related to #700

cc @Dandandan

Yes, it's the same observation.
To me the proposed solution sounds like a good idea. FYI @andygrove

@jon-chuang
Copy link

Hi, this seems interesting, would love to try implementing it

@jon-chuang
Copy link

Here is the proposed design:

  • when an executor initially connects to the scheduler, it also tells the scheduler how many task slots it has. The amount of memory per task as per [Epic] Optionally Limit memory used by DataFusion plan #587 could also be negotiated here.
  • As long as the executor is alive, it tries to send jobs to it. The scheduler tries to prioritize sending tasks to executors with most slots available.

Just wondering if the cardinality estimates/execution cost model could be used for more intelligent scheduling. Also wondering if each task runs single threaded or if they can exploit more cores on the system, and if so, if they utilize a common threadpool or shard the number of cores in the system so that each of the executor's n slots has 1/n of the cores.

@alamb
Copy link
Contributor

alamb commented Nov 10, 2021

@jon-chuang sounds like a good start.

I think something else that the scheduler should be able to take advantage of in the future might be "data locality" -- that is if a plan looks like

(plan section 1) -- writes intermediate results --> (plan section 2)

It is likely advantageous in may cases to run section 1 and section 2 on the same executor, if possible, to avoid having to send ("reshuffle") the intermediate results around

@jon-chuang
Copy link

jon-chuang commented Nov 14, 2021

Regarding shuffling, I saw in some benchmarks for TiDB's distributed query engine (incidentally also relying on columnar storage) that an MPP style shuffle seemed to produce better results than map reduce style of Apache Spark. I think there are some open questions, such as whether Java could be the cause of this discrepancy. But maybe it's also worth thinking about how to optimize the shuffles.

I don't know enough about DataFusion to know if it takes into account data movement when generating query plans.

@mingmwang
Copy link
Contributor Author

@jon-chuang sounds like a good start.

I think something else that the scheduler should be able to take advantage of in the future might be "data locality" -- that is if a plan looks like

(plan section 1) -- writes intermediate results --> (plan section 2)

It is likely advantageous in may cases to run section 1 and section 2 on the same executor, if possible, to avoid having to send ("reshuffle") the intermediate results around

Can you please explain the "data locality" requirements a little more ? I think for normal source tasks which read data from remote storage(cloud storage or Hdfs), there is no data locality. And for shuffle readers which have to read data from all map tasks, there is no data locality either.

@mingmwang
Copy link
Contributor Author

Regarding shuffling, I saw in some benchmarks for TiDB's distributed query engine (incidentally also relying on columnar storage) that an MPP style shuffle seemed to produce better results than map reduce style of Apache Spark. I think there are some open questions, such as whether Java could be the cause of this discrepancy. But maybe it's also worth thinking about how to optimize the shuffles.

I don't know enough about DataFusion to know if it takes into account data movement when generating query plans.

Actually I'm working on a MPP style shuffle implementation, most of the coding part is done and I'm doing the testing.
I'm not sure whether the community need this feature or not.

@houqp
Copy link
Member

houqp commented Nov 15, 2021

Actually I'm working on a MPP style shuffle implementation, most of the coding part is done and I'm doing the testing.
I'm not sure whether the community need this feature or not.

I am very interested in this, do you mind sharing with us when it's ready?

@alamb
Copy link
Contributor

alamb commented Nov 15, 2021

Can you please explain the "data locality" requirements a little more ? I think for normal source tasks which read data from remote storage(cloud storage or Hdfs), there is no data locality. And for shuffle readers which have to read data from all map tasks, there is no data locality either.

I was thinking of a plan such as the following. There may be cases when reshuffling between scan/filter and aggregate is worthwhile (e.g. to distribute the load better) I think the cost of reshuffling will mostly end up dominating any savings

                                                                      
                                                                      
        rest of plan                                                  
                                                                      
                                                                      
              │                                                       
              │                                                       
              │                                                       
┌ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ┐                                       
              ▼                                                       
│   ┌───────────────────┐     │                                       
    │   HashAggregate   │                                             
│   └───────────────────┘     │                                       
              │                               Data is not reshuffled  
│             │               │              between scan, filter and 
              ▼                  ◀ ─ ─ ─ ─ ─        aggregate         
│   ┌───────────────────┐     │                                       
    │      Filter       │                                             
│   └───────────────────┘     │                                       
              │                                                       
│             │               │                                       
              ▼                                                       
│   ┌───────────────────┐     │                                       
    │     TableScan     │                                             
│   └───────────────────┘     │                                       
                                                                      
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                                       

@liukun4515
Copy link
Contributor

liukun4515 commented Nov 18, 2021

Actually I'm working on a MPP style shuffle implementation, most of the coding part is done and I'm doing the testing.
I'm not sure whether the community need this feature or not.

I am very interested in this, do you mind sharing with us when it's ready?

@mingmwang you can file a draft pr first.

@yahoNanJing
Copy link
Contributor

Hi @alamb and @houqp, recently we have implemented an initial version of push-based task scheduling. Here's the design document. Could you help have a review?
https://docs.google.com/document/d/1Z1GO2A3bo7M_N26w_5t-9h3AhIPC2Huoh0j8jgwFETk/edit?usp=sharing

PR is ongoing.

@houqp
Copy link
Member

houqp commented Jan 12, 2022

Thank you @yahoNanJing for writing up the design doc! I will take a close look at it this weekend 👍

@mingmwang
Copy link
Contributor Author

👍

@alamb
Copy link
Contributor

alamb commented Jan 13, 2022

I will also try and find some time to read this document, but it may not be for a few days

@yahoNanJing
Copy link
Contributor

An initial version of PR is there #1560.

@jon-chuang
Copy link

jon-chuang commented Jan 14, 2022

Hi all, I've been working on a Rust API for the Ray distributed computing framework that powers many popular python ML libraries like RLLib, Ray Train and Ray Tune.

The Rust API is currently nearing the end of the prototype phase and we are looking for real-world usage for the project. You can view the tracking issue: ray-project/ray#20609 and prototype progress: ray-project/ray#21572

I'm quite interested in exploring the use of the Ray for highly-performant and efficient scheduling of tasks for Ballista. Note that one can do locality-aware scheduling with Ray, which can perform well even without randomized data partitioning etc. - thus opening new possibilities for Ballista's performance.

A second advantage of Ray is that the API is simple, so we don't need to deal with networking code and communication protocols which are difficult to maintain.

// This proc macro generates data marshalling, function registration 
// and internal ray::core API calls for the remote function
#[ray::remote]
fn my_task(..) {
  ..
}

fn main() {
  let obj = T::new();
  
  let id = ray::put::<T>(obj); // put the object into shared memory / object store
  
  // This can run on a remote node, 
  // as scheduled by the distributed scheduler
  let id2 = ray::task(my_task).remote(id); 
  
  let result = ray::get::<T2>(id2); // get object from shared memory
  
  println!("{:?}", result);
}

In the future, we are also interested in supporting GPU tasks via rustc's PTX backend that can be run on any NVIDIA GPU. So we could maybe accelerate Ballista the way that RAPIDS accelerates Spark etc, by converting physical operators into GPU kernels.

#[ray::remote(enable_for_gpu)]
fn my_compute_intensive_task(..) {
  arrow_data[ptx::idy() * N + ptx::idx()] = ..;
}

fn main() {
  let arrow_data = ray::task(load_distributed_data).remote(partition_id);
  ray::task(my_compute_intensive_task).as_gpu_task().remote(arrrow_data);
}

Our plan is also to support zero-copy reading of (immutable) Arrow data directly from the object store (on the same node) across multiple tasks.

Do let me know if anyone is interested in this. I will be happy to chat.

You can also shoot me an email chuang {d0t} jon [AT] gmail - dott - com

@yjshen
Copy link
Member

yjshen commented Jan 14, 2022

@jon-chuang Thanks for bringing this up. I may mistake something for Ray, please point out.

IMHO, Ray is designed to ease the development of the general purposed distributed program. It's more like "parallel your machine learning code and run on a cluster without pain", just like what you have provided in the code sample above.

On the other hand, Ballista is meant to be a distributed SQL query engine, the code to distribute and run is quite limited, it's all about DataFusion's limited number of physical operators. So what should I expect from Ray integration? Does Ray provide core abilities like task scheduling, keepalive monitoring, struggler detection, and speculative task execution? Therefore I could easily build a distributed SQL engine on top of DataFusion with little effort?

@jon-chuang
Copy link

jon-chuang commented Jan 14, 2022

@yjshen thanks for your questions

task scheduling, keepalive monitoring, struggler detection, and speculative task execution\

  • yes.
  • yes and failure recovery at task level. We also have a worker monitoring dashboard with basic resource utilization info.
  • we do not have robust distributed tracing tools yet, but it is planned. As for scheduling, it does not currently take into account global information like straggling in an execution DAG and try to prioritize bottlenecked tasks. However, we are looking into priority mechanism for tasks, through which a user (or external monitoring tool) could prioritize bottlenecked tasks.
  • Note that Ray will always try to schedule tasks if there are resources available. So if the dataframe/SQL operation does not have an all-to-all dependency, it will automatically proceed to the next stage. We also have plans to preempt workers in anticipation of OOM.

Therefore I could easily build a distributed SQL engine on top of DataFusion with little effort?

This is unclear to me, and requires more investigation. However, note that the distributed dataframe project Modin was built on top of Ray.

the code to distribute and run is quite limited, it's all about DataFusion's limited number of physical operators.

Yes. I think the use-case is perhaps for incremental and interactive SQL queries that can take advantage of low-latency scheduling. For instance, backend serving for many queries (> 10-100 MOps) over a distributed dataset.

I think these workloads might currently be out of scope for Ballista, which is aimed at analytics just like Spark is, but it is interesting to consider.

For instance, time series DBs and Materialize DB offer this sort of streaming SQL computation. Also consider something like NoriaDB which is optimized for read-heavy serving workloads and offers incremental SQL computation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ballista enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants