Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User-friendly Hadoop-based re-indexing/compaction #1517

Closed
gianm opened this issue Jul 14, 2015 · 9 comments
Closed

User-friendly Hadoop-based re-indexing/compaction #1517

gianm opened this issue Jul 14, 2015 · 9 comments
Labels

Comments

@gianm
Copy link
Contributor

gianm commented Jul 14, 2015

#1374 is an implementation of being able to reindex Druid segments using the Hadoop indexer. There should be some user friendly way of doing this on an ongoing basis (for cleaning up realtime-ingested segments).

I think this could be done as a "HadoopReindexTask" that takes a really simple spec, just "dataSource", "intervals", and tuning stuff like a partitionsSpec and whatnot. It would then scan the timeline for the provided "intervals", find sections where there are segments that are unbalanced (just by looking at their metadata, not by actually opening them) and then reindex those sections. The idea is that if segments in those intervals are fine the way they are, the task should do nothing. The idea is also that if you run the task twice, it shouldn't do anything the second time.

It could be useful to have some override that forces the reindexing to always happen. This could be useful if the automatic thing doesn't make the decision that you wanted it to make.

I think to be really user friendly this will depend on storing aggregatorFactories in segment metadata (#1514), since otherwise you'd have to provide them to the HadoopReindexTask.

If this were implemented then it could be automated by just regularly submitting HadoopReindexTasks with intervals = 0000 to (now - X) where X is a few days or whatever.

@himanshug
Copy link
Contributor

@guobingkun can you pls look into this.

@guobingkun
Copy link
Contributor

@himanshug sure, will do.

@xvrl
Copy link
Member

xvrl commented Sep 10, 2015

@gianm would this essentially be a "HadoopMergeTask"?

@gianm
Copy link
Contributor Author

gianm commented Sep 10, 2015

@xvrl sorta, except hopefully better in a few ways,

  1. you can provide intervals rather than segments
  2. it's idempotent
  3. ideally you don't have to provide the aggregator specs to the task (this depends on Store aggregatorFactories in segment metadata #1514)

@himanshug
Copy link
Contributor

@xvrl @gian
as I am talking to @guobingkun , I am imagining, "HadoopMergeTask" will be a simple wrapper around "HadoopIndexTask" that will be more like a convenient way(with some auto filling such as that of granularity, aggregators etc) of calling "HadoopIndexTask" with dataSource pathSpec in it .

@guobingkun
Copy link
Contributor

I have been thinking letting Coordinator find unbalanced segment sections and submit HadoopReindexTask for those unbalanced sections.

Basically, we can create a runnable that Coordinator periodically executes.

At start of this runnable, for each dataSource, Coordinator will build a timeline of segments,
then for each segment's shards, we tag it by either S or O

S: the size of shard is below optimal size
O: the size of shard >= optimal size

Optimal size should be configured externally, we could probably just reuse Coordinator's "mergeBytesLimit".

Here is a timeline of segments for a specific dataSource (Note: The segment granularity for each segment could be arbitrary)

SegID:    1   2   3   4   5   6   7   8   9  10  11  12   13
Shard0: |_O_|_O_|_O_|_O_|_S_|_S_|_S_|_O_|_S_|_O_|_S_|_O_|_S_|__|__|__|__|__|__|__|
Shard1:         |_S_|                   |_S_|   |_S_|_S_|
Shard2:         |_S_|                   |_S_|   |_S_|

Coordinator will periodically scan this timeline, and submit HadoopReindexTask for unbalanced sections. The end result we want to achieve is that there are no shards in the timeline that are tagged by S.

Here is a relatively concrete algorithm that Coordinator could use for this runnable,

currTotalSize = 0
currInterval = null
shouldMergeNow = false

for each segment
  if currInterval == null
    currInterval = segment.getInterval()
  else 
    currInterval = currInterval.withEnd(segment.getEnd())
  for each shard of current segment
    if shard.getSize() < optimal size
      shouldMergeNow = true
    currTotalSize += shard.getSize()

  if currTotalSize >= optimal size 
    if shouldMergeNow 
      // Note: I still need to think of a routine that determines the segment granularity for 
      // HadoopReindexTask so that the segments generated by it have evenly distributed sizes.
      submitHadoopReindexTask(dataSource, currInterval) 

    // at this point, we are certain segments in currInterval are balanced
    currTotalSize = 0
    currInterval = null
    shouldMergeNow = false
  else {
    // do nothing. since the current segment doesn't have enough total size to make a segment 
    // with size >= optimal size, we'll seek to merge it with the next segment in the timeline
  }

if currInterval != null (this check is same as if currTotalSize > 0)
  // To discuss: if the last chunk doesn't have enough size, what should we do about it?
  // Some options, 
  // 1. We don't do anything with it, simply wait for new segments coming in, and the 
  // next Coordinator's run will do the right thing.
  // 2. Find a way to merge it with a previous optimal segment
  // 3. ?

Basically, the algorithm iterates each segment in the timeline, and for each shard in that segment, we check if its size is below optimal size, if yes, we should definitely try to merge that shard with others. However, it's possible that the total size of shards in that segment is not big enough to make an optimal size segment, so we should seek to merge the current segment with the next one in the timeline.

This algorithm should handle the cases below,

Case 1:
Segment 3 has three shards, two of which have sub-optimal sizes. In this case, we should reindex the interval that segment 3 covers, such that all its shards have optimal size.

Case 2:
Segments 5, 6, 7 are all small, so we continue iterating until we find Segment 8, now the current total size is >= optimal size, we can submit a reindex task that cover 5, 6, 7, 8.

Case 3:
Similar to Case 2, Segment 9 has three shards, but the total size of its shards are still less than optimal size, so merge it with Segment 10.

Case 5: The total size of all the segments in the history are not big enough, there is nothing we can do in this case, we just have to wait for new segments coming in such that we have enough size.

Case 6: Segment 13 is left over, should we merge it with previous segment or wait for new segments coming in?

Case 7: ?

@guobingkun
Copy link
Contributor

Tacked by #1998

guobingkun pushed a commit to guobingkun/druid that referenced this issue Mar 25, 2016
Fixes apache#1517

Deprecates druid.coordinator.merge.on. Instead, user will use druid.coordinator.merge.strategy for segment merging.

In this PR,
 ```DruidCoordinatorHadoopSegmentMerger``` will find small segments in the segment timeline, and submit HadoopIndexTask to reindex those imbalanced segments.

For example,
Here is a timeline of segments for a specific dataSource (Note: The segment granularity for each segment could be arbitrary, there might also be a gap between two segments)
```
SegID:    1   2   3   4   5   6   7   8   9  10  11  12   13
Shard0: |_O_|_O_|_O_|_O_|_S_|_S_|_S_|_O_|_S_|_O_|_S_|_O_|_S_|__|__|__|__|__|__|__|
Shard1:         |_S_|                   |_S_|   |_S_|_S_|
Shard2:         |_S_|                   |_S_|   |_S_|

S: the size of shard is below configured optimal size
O: the size of shard >= configured optimal size
```
As ```DruidCoordinatorHadoopSegmentMerger``` runs periodically, eventually there will be no shards in the timeline that are tagged with S.

Here is the algorithm ```DruidCoordinatorHadoopSegmentMerger``` implements for finding imbalanced segments and submits the reindex task,

```
unbalancedIntervals = []
currTotalSize = 0
intervalToReindex = null
shouldBeMerged = false  // whether we should reindex segments within intervalToReindex

// the direction of scanning will alternate if we don't want to merge segments
// that have gaps between them
for each segment

  expand intervalToReindex with the interval of current segment

  for each shard of current segment
    if shard.getSize() < optimal size
      shouldBeMerged = true // found a small segment
    currTotalSize += shard.getSize()

  // this is an estimate that whether we can get a good-size segment by
  // reindexing the segments within intervalToReindex
  if currTotalSize >= optimal size
    if shouldBeMerged
      unbalancedIntervals.add(intervalToReindex)
    currTotalSize = 0
    intervalToReindex = null
    shouldBeMerged = false
  else {
    // do nothing. Since we don't have enough total size to make a segment
    // with size >= optimal size, we'll seek to merge it with the next segment in the timeline
  }

if unbalancedIntervals is not empty:
  submit HadoopIndexTask with unbalancedIntervals

if we want to keep the segment gap during merge
  invert the scan direction so that in next Coordinator's run, we will scan the segments in the inverted direction.
Note: The reason we need to invert the scan direction when keepGap is enabled
is because it's possible to have a small segment that is at the boundary of gap left over
 and never gets merged.

```

Basically, the algorithm iterates each segment in the timeline, and for each shard in that segment, we check if its size is below optimal size, if yes, we should definitely try to merge that shard with others. However, it's possible that the total size of shards in that segment is not big enough to make an optimal size segment, so we should seek to merge the current segment with the next one in the timeline.
Note: If we want to keep the gap between segments that don't abut, it's possible to have small segments left over, however, we can invert the scan direction in the next Coordinator's run to eliminate those small segments.
guobingkun pushed a commit to guobingkun/druid that referenced this issue Mar 30, 2016
Fixes apache#1517

Deprecates druid.coordinator.merge.on. Instead, user will use druid.coordinator.merge.strategy for segment merging.

In this PR,
 ```DruidCoordinatorHadoopSegmentMerger``` will find small segments in the segment timeline, and submit HadoopIndexTask to reindex those imbalanced segments.

For example,
Here is a timeline of segments for a specific dataSource (Note: The segment granularity for each segment could be arbitrary, there might also be a gap between two segments)
```
SegID:    1   2   3   4   5   6   7   8   9  10  11  12   13
Shard0: |_O_|_O_|_O_|_O_|_S_|_S_|_S_|_O_|_S_|_O_|_S_|_O_|_S_|__|__|__|__|__|__|__|
Shard1:         |_S_|                   |_S_|   |_S_|_S_|
Shard2:         |_S_|                   |_S_|   |_S_|

S: the size of shard is below configured optimal size
O: the size of shard >= configured optimal size
```
As ```DruidCoordinatorHadoopSegmentMerger``` runs periodically, eventually there will be no shards in the timeline that are tagged with S.

Here is the algorithm ```DruidCoordinatorHadoopSegmentMerger``` implements for finding imbalanced segments and submits the reindex task,

```
unbalancedIntervals = []
currTotalSize = 0
intervalToReindex = null
shouldBeMerged = false  // whether we should reindex segments within intervalToReindex

// the direction of scanning will alternate if we don't want to merge segments
// that have gaps between them
for each segment

  expand intervalToReindex with the interval of current segment

  for each shard of current segment
    if shard.getSize() < optimal size
      shouldBeMerged = true // found a small segment
    currTotalSize += shard.getSize()

  // this is an estimate that whether we can get a good-size segment by
  // reindexing the segments within intervalToReindex
  if currTotalSize >= optimal size
    if shouldBeMerged
      unbalancedIntervals.add(intervalToReindex)
    currTotalSize = 0
    intervalToReindex = null
    shouldBeMerged = false
  else {
    // do nothing. Since we don't have enough total size to make a segment
    // with size >= optimal size, we'll seek to merge it with the next segment in the timeline
  }

if unbalancedIntervals is not empty:
  submit HadoopIndexTask with unbalancedIntervals

if we want to keep the segment gap during merge
  invert the scan direction so that in next Coordinator's run, we will scan the segments in the inverted direction.
Note: The reason we need to invert the scan direction when keepGap is enabled
is because it's possible to have a small segment that is at the boundary of gap left over
 and never gets merged.

```

Basically, the algorithm iterates each segment in the timeline, and for each shard in that segment, we check if its size is below optimal size, if yes, we should definitely try to merge that shard with others. However, it's possible that the total size of shards in that segment is not big enough to make an optimal size segment, so we should seek to merge the current segment with the next one in the timeline.
Note: If we want to keep the gap between segments that don't abut, it's possible to have small segments left over, however, we can invert the scan direction in the next Coordinator's run to eliminate those small segments.
@stale
Copy link

stale bot commented Jun 21, 2019

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 2 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Jun 21, 2019
@stale
Copy link

stale bot commented Jul 5, 2019

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.

@stale stale bot closed this as completed Jul 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants