Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Native parallel batch indexing #5543

Closed
jihoonson opened this issue Mar 27, 2018 · 14 comments
Closed

[Proposal] Native parallel batch indexing #5543

jihoonson opened this issue Mar 27, 2018 · 14 comments

Comments

@jihoonson
Copy link
Contributor

jihoonson commented Mar 27, 2018

We currently have two types of batch index tasks, i.e., local index task and Hadoop index task. Also a spark task is available with a third party extension.

Motivation

All task types have some limitations. The Hadoop/Spark index task requires an external Hadoop/Spark cluster to be run. This kind of dependency on external systems usually causes some issues like

  1. users must have an external cluster,
  2. the library versions used by both Druid and Hadoop/Spark should match, and
  3. the external cluster should be large enough to run Druid batch jobs without degrading the performance of other workflows.

Meeting these all requirements has been painful for the people who have just started to try Druid.

The local index task doesn't have a dependency on any external systems, but it runs with a single thread and thus it doesn't fit for practical use cases.

As a result, we need a new task type which meets both requirements of lack of dependency on external systems and being capable of parallel indexing. This is what we're already doing in Kafka ingestion.

Goals

The goal of this proposal is introducing new parallel indexing methods. This is not to replace existing Hadoop task with new ones. It's about providing more cool options to Druid users.

  • Single phase parallel indexing without shuffling intermediate data
  • Two phase parallel indexing with shuffling intermediate data. This can be used for perfect rollup.
  • New indexing methods don't have dependency on other systems and use only the resource of the Druid cluster (peons)
  • New indexing methods work based on the existing Druid's parallel task execution mechanism.
    • Every new task types are submitted to the overlord and treated as a normal task types. There should be no special handling for these task types.

Design

Each new parallel indexing with/without shuffle consists of two task types, i.e., a supervisor task and its worker tasks. Once a supervisor task is submitted to an overlord by users, it internally submits its worker tasks to the overlord.

Worker tasks read input data and generate segments. The generated segments are pushed by worker tasks. Once they finish their work, they report the list of pushed segments to the supervisor task.

The supervisor task monitors the worker task statuses. If one of them fails, the supervisor task retries the failed one until the retry number reaches a preconfigured threshold. Once all worker task succeeds, then the supervisor task collects pushed segments from all worker tasks and publishes them atomically.

In both indexing methods, the parallelism of the initial phase is decided by the input firehose. To do this, the splittable firehose is introduced. A splittable firehose is responsible to let the supervisor task know how the input can be split. The supervisor task generates worker tasks according to the splittable firehose implementation.

In two phase parallel indexing, the supervisor task submits worker tasks of the second phase once the first phase completes. The second phase workers read the intermediate result of the first phase workers and generates segments. The parallelism of the second phase can be decided by the size of intermediate data. Thus, the supervisor should be capable of collecting the size of intermediate data from all worker tasks and adjusting the parallelism depending on the size. To support shuffle, the intermediate result of the first phase should be kept until the second phase completes.

Implementation Plan

  • Single phase parallel indexing without shuffle
    - General shuffle system which is available for both indexing systems and querying systems in Druid
    - The shuffle system should be available for two phase parallel indexing
    - The shuffle system should also be available for Druid's querying system. This can be used for faster query processing when the size of intermediate data is large.
  • Two phase parallel indexing with shuffle

Out of scope of this Proposal

  • Doesn't create a new general processing framework like Hadoop or Spark. We don't need to reinvent the wheel.
  • Doesn't replace existing Hadoop tasks with new ones. They are still great.
  • Doesn't make a better task scheduling in overlords like Yarn's fair scheduler. This is required, but should be done separately.
  • Doesn't handle the single point of failures of the supervisor tasks. This might be done separately if needed in the future.
@drcrallen
Copy link
Contributor

drcrallen commented Apr 5, 2018

@jihoonson This seems to focus on initial data ingestion, can you comment on how this could play into things like Merge tasks? or potentially reducing the fidelity of existing data with a new index spec?

(both possibly outside the scope of MVP)

@jihoonson
Copy link
Contributor Author

This seems to focus on initial data ingestion, can you comment on how this could play into things like Merge tasks? or potentially reducing the fidelity of existing data with a new index spec?

@drcrallen well, this proposal is not for introducing a general parallel framework to make any task types parallelizable. Instead, it introduces new task types (and required shuffle system) for parallel indexing. Since these new task types should allow any types of splittable firehoses, they can be used for reindexing if we add a splittable IngestSegmentFirehose.

@himanshug
Copy link
Contributor

implementing the parallel indexing without shuffle itself sounds useful to me for many users e.g. in combination with #5238 for ingesting data from databases. so, 👍

I'll defer shuffle related comments because there isn't enough low level details about its implementation flushed out. my guess is that it will probably end up looking very similar to Hadoop MR's . that is not necessarily a bad thing given the advantages of it being supported out of the box in Druid.

@jihoonson
Copy link
Contributor Author

@himanshug thanks for the comment. We may need a more detailed proposal shuffle system. Also, I'm not sure about sharing the same shuffle system by both indexing and querying now because they need different requirements. I'll raise another issue for it later.

@davidzollo
Copy link

davidzollo commented Nov 8, 2018

This is very useful! 👍👍👍
I'm going crazy because the library versions of Hadoop and Druid can't match.

@csimplestring
Copy link

csimplestring commented Dec 23, 2018

Hi, does IngestSegmentFirehose work for the native parallel batch indexing?

@jihoonson
Copy link
Contributor Author

Hi @csimplestring, yes it should work.

@sascha-coenen
Copy link

This motion is AWESOME AWESOME AWESOME!!!!
Well done!! I cannot wait to see the two phase shuffle. This is SO much needed.

I read in the comments section of related PRs about why one would need yet another data processing framework and what would be the issues with Spark/Hadoop.
This puzzles me for the following reason:
I have tasked several people to find out how to combine batch processing and stream processing for Druid and although a lot of time was being sunk into the subject matter, not a SINGLE person was able to come up with a viable solution, myself included. Let it be said too, that we have been running a million dollar Druid cluster for several years now and keep trillions of records in it.
So neither are we new to Druid nor are we idiots and yet we keep scratching our heads about how to put the pieces together.

In my opinion, Druid needs native indexing support more than anything, especially in the context of finding a more wide-spread adoption and growing the community.

I very much hope that more and more people can join in this effort. Most database systems come with native DML support and thus, competitor products like MPP databases such as Vertica have native support for ingesting big-data workloads.
Having a native batch indexing support in Druid would not only make Druid more competitive and easier sell, but it is strategically also an enabler for advanced setups, like putting Druid on kubernetes. Containerizing Hadoop/Spark alone is not easy and far from being a small effort and doing it in a way that lets such a setup play nicely with Druid requires handcrafting the whole setup.
Middlemanager however can easily be containerized (although it would be even nicer if there weren't any peons I guess) which in turn is a segway to co-locating different workloads on the same hardware. Achieving this for an ecosystem that encompasses Spark/Hadoop is something that only large companies with deep pockets and a bugdet for inhouse customizations can achieve.

The second most needed feature is OLAP cubing (materialized views) which was added to Druid 0.13 as a prototype recently but currently requires a Hadoop cluster. So folks who went with a Spark-based indexing cannot use it unless they reinvent the wheel by adding support for it too.
So in this sense, it is NOT the creation of a native processing framework that is "re-inventing the wheel" but on the contrary, it is precisely the previously chosen approach of having external processing frameworks that deserve this label.


I'm going crazy because the library versions of Hadoop and Druid can't match
+1


I'm not sure about sharing the same shuffle system by both indexing and querying now because they need different requirements.
+1
Great thinking on behalf of jihoonson to propose this, but in the spirit of making babysteps it seems that one should first try to keep things easy by thinking about this in isolation. One can then make it an unrelated follow-up research task as to whether and how existing subsystems of Druid could be unified

@jihoonson
Copy link
Contributor Author

@csimplestring, I'm sorry, but I was wrong. IngestSegmentFirehose is not available for native parallel indexing and will be implemented in #7048.

@stale
Copy link

stale bot commented Jun 20, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Jun 20, 2019
@stale
Copy link

stale bot commented Jul 4, 2019

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.

@stale stale bot closed this as completed Jul 4, 2019
@jihoonson jihoonson reopened this Jul 10, 2019
@stale
Copy link

stale bot commented Jul 10, 2019

This issue is no longer marked as stale.

@github-actions
Copy link

This issue has been marked as stale due to 280 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If this issue is still
relevant, please simply write any comment. Even if closed, you can still revive the
issue at any time or discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions bot added the stale label Jun 29, 2023
@github-actions
Copy link

This issue has been closed due to lack of activity. If you think that
is incorrect, or the issue requires additional review, you can revive the issue at
any time.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Jul 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants