Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Druid On Yarn #4400

Closed
rongzhg opened this issue Jun 13, 2017 · 12 comments
Closed

[Proposal] Druid On Yarn #4400

rongzhg opened this issue Jun 13, 2017 · 12 comments
Labels

Comments

@rongzhg
Copy link

rongzhg commented Jun 13, 2017

Motivation:

In large Druid clusters with over 100 machines, there will be hundreds of workers especially for brokers and historical nodes. It's hard to monitor, add or reduce workers for certain roles automatically.

For example, if a worker dead unexpectedly, sys admins need to start it manually.
The time gap between will affect user experience.

And also, if more historical workers are needed, manual work is needed to start more workers on hosts.

Proposal:

With Yarn included, those problems could be solved perfectly. There's no need for sys admins to care about the worker locations for all roles. Workers could be assigned and started automatically.
Dead workers will be started in time and with a command, more workers could be added to certain roles.

An app master for druid is responsible to start worker and monitor worker status. In app master, customer-oriented rules could be made for each role. For example, workers should be started at the same host when restart so that files on disk could be reused.

@b-slim
Copy link
Contributor

b-slim commented Jun 13, 2017

@RongZhang828 this is a great proposal looking forward to see more details !

@drcrallen
Copy link
Contributor

Just as a general fyi, we run on marathon/mesos using just marathon app definitions.

@drcrallen
Copy link
Contributor

Here is a list of challenges coming from someone using druid in a containerized world:

  • How do you handle the indexing service?
  • How do you handle live upgrades of an indexing service task?
  • How do you handle persistent volumes for historicals?
  • How do you make sure your container upgrade strategies are sane?
  • How do you properly isolate failure domains?
  • How do you minimize the impact of noisy neighbors?
  • Lots of thread pools size as some fraction of total system cpu count, did you make sure to set them all correctly?
  • With TLS support in the works, how do you manage certs and CAs?
  • How do you scale up without having an operator poke a bunch of different settings?
  • How do you handle logging?
  • How do you look at metrics so you know if 100% cpu usage is ok or bad?

A useful solution can certainly be developed that only addresses a subset of these. Hopefully our experience will help ease adoption and maximize utility of such a feature.

@rongzhg
Copy link
Author

rongzhg commented Jun 22, 2017

Thanks a lot for your suggestions @drcrallen . With your input and my practice, I came up with the solution below.

Druid roles can be categorized into 2 different groups:

  • Controller: coordinator/middleManager -- limited workers for each controller role for one cluster.
  • Worker: broker/historical -- large number of workers is needed for one cluster with hundreds of machines

So on first stage of druid on yarn project, I only take worker role (broker/historical) into consideration.

Details as below:

druidonyarnarchitecture

One application master for each module.

  1. Druid client send request to yarn resource manager to start up app master
  2. Yarn resource manager start the app master
  3. App manager ask resource for broker/historical according to certain strategies
  4. ResourceManager return the resource and app manager will get connection with those node manager
  5. App manager ask related node manager to start broker/historical
  6. broker/historical register themselves into zookeeper after startup for failover senerio
  7. App manager watch the related zookeeper path. If any broker/historical fails due to OOM or other exception, app manager will try to restart it.

Currently, Druid has its constrains as a distributed system.
Broker

  1. Users need to connect to brokers to query druid. So broker ip and port are exposed to either VIP or user directly. So broker ip and port can't be changed.
  2. If use VIP as load balancer, all broker would have to start on the same port. Thus only one worker can be started per node.

Historical

  1. Historical works as local cache for data. It takes some time to download data from HDFS to local disk. So it's better to restart historical on the same node when failover.
  2. Currently, all historical use the same dir to store data. So only one historical can be started per node.

Due to above reasons, druid is not a normal distributed systems. The druid application manager needs to be able to support at least 2 worker distribution strategies:

  1. Exclusive -- one broker/historical per node
  2. Normal -- workers can be distributed on any node with enough resource

On first stage, I'll focus on the exclusive strategy. The second one needs more druid code modification. And also, for failover strategy, workers will be start up at the same node before it dead abnormally.

From long term, druid broker and historical should be able to support the normal strategy. And the solutions to this problems are quiet easy.

druidonyarn-broker

  • For brokers, use a proxy to distribute the querys
  • For historical, app manager will asign a data dir for each historical.

Perhaps above functions would be done in stage two.

Let me know if you have any questions or ideas.

@b-slim
Copy link
Contributor

b-slim commented Jun 22, 2017

@RongZhang828 This is great proposal !
I would like to note that middleManager can be a worker as well. In fact as oppose to overlord, middleManagers nodes will be doing realtime data ingestion and can be part of the query path. I Think you can add Overlord to the pool of controllers.
Also with Druid 0.10 we can run Overloard and Coordinator within the same JVM so that might make this more straightforward.

@b-slim
Copy link
Contributor

b-slim commented Jun 22, 2017

For query routing we do have a druid router node that can be used. Router will discover brokers via ZK service. So this can be a viable way to hide the complexity of having fix pool of ip/ports for brokers.

@rongzhg
Copy link
Author

rongzhg commented Jun 26, 2017

I'm working on it now and hopefully will finish it in the next few weeks :)

@egor-ryashin
Copy link
Contributor

egor-ryashin commented Jun 26, 2017

Interesting proposal, I think the following things should be considered closely:

  • Rolling restart capability. For example Mesosphere Marathon allows to keep a percentage of the cluster up while doing an upgrade/reconfiguration/maintenance restart of the cluster. That is very important for Historical clusters.
  • Readiness check period. When you do a rollout and need to keep a part of the cluster up, you need some status, so to decide is a new node is ready and healthy, so you can proceed with rest of the cluster. For example Historical needs some time to read cache from disk storage, this period should not be considered neither healthy nor unhealthy. If we take that period as healthy we will restart all the cluster at once (could be a huge strain for the coordinator, and could cause general unavailability of the service). If we take (cache loading period) as unhealthy then (cache loading is long enough) the task will fail the health check and will be restarted during cache loading. So health checks should trigger a restart due to a node failure, and readiness checks delay a rollout until new nodes can serve loaded segments.
  • I think, even if you aren't going to implement multiple running druid containers on a single node, you have to plan the architecture for that from the start, so at the time when you start to implement you don't have to rewrite most of your previous solutions. How do you plan to provide ports/paths parameters in that case, for example?
  • I think, thread pool sizes should be derived from allocated vcores number dynamically.
  • What the capabilities of the tracking UI do you plan?

Also, I don't know whether your MiddleManager cluster is using Indexer auto-scaling feature. If yes, then why do you need Yarn for MiddleManager?
Do you plan to implement any kind of resource isolation?
Did you consider Mesos?

@rongzhg
Copy link
Author

rongzhg commented Jun 27, 2017

Well for now, we use yarn in production and most likely druid will run on hadoop clusters with other applications like flink. So currently I'd like to implement druid on yarn first.

  • The app master can support several different startup strategies:

    1. exclusive -- one container on a single node
    2. common -- container can be started on any node with enough resource

    And more strategies could also be added dynamicly with limited code change.

    The second strategy depends on a critical feature: configuration needs to be replaced on worker startup by app master.

    For example: the historical data dir and the thread pool sizes. With multitenancy in to consideration, even the zk path need to be set by app master.

    The design for this config-replace feature is to separate the binary build and config. The configuration will be replaced and merged into binary build before worker startup by app master.

  • Resource isolation
    As far as I know, yarn has resource isolation function using cgroup.

  • MiddleManager
    For now, middle manager is considered as controller role. So it won't be scheduled by yarn.

  • Rolling restart
    Rolling restart is a great proposal.

    1. In my design, one app master is designated for one role. So sysadmins needs to decide which role to upgrade.
    2. Upgrade broker/historical.

    Currently, the strategy is rather simple. Split workers into several batches and upgrade these batches one by one with a time interval of n. The batch size and time interval will be decided by sysadmins according to their experience.

    I can't find any good way to check the status for brokers and historical. For my understanding, currently, there's no interface to get the worker status.

  • Tracking UI
    Well i haven't thought about it thoroughly yet as the yarn UI is enough for me now. So I would be really appreciate if you have any ideas.

@tekn0ir
Copy link

tekn0ir commented May 17, 2018

Have you considered Kubernetes? My guess is that it can replace Zookeeper as well.

@stale
Copy link

stale bot commented Jun 21, 2019

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 2 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Jun 21, 2019
@stale
Copy link

stale bot commented Jul 5, 2019

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants