Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor index merging, replace Rowboats with RowIterators and RowPointers #5335

Merged
merged 27 commits into from
Apr 28, 2018

Conversation

leventov
Copy link
Member

@leventov leventov commented Feb 2, 2018

What this PR does is explained in this issue: #4622.

Key interfaces added in this PR are TimeAndDimsIterator, TimeAndDimsPointer, RowIterator and RowPointer. They have very elaborate Javadocs, please read them in order to get the idea of this PR better.

It's performance effect not as good as I expected:

BEFORE
Benchmark                    (numSegments)  (rollup)  (rowsPerSegment)  (schema)  Mode  Cnt        Score       Error  Units
IndexMergeBenchmark.mergeV9              5      true             75000     basic  avgt   25  3209755.018 ± 44128.873  us/op
IndexMergeBenchmark.mergeV9              5     false             75000     basic  avgt   25  3123278.657 ± 37920.421  us/op

AFTER
Benchmark                    (numSegments)  (rollup)  (rowsPerSegment)  (schema)  Mode  Cnt        Score       Error  Units
IndexMergeBenchmark.mergeV9              5      true             75000     basic  avgt   25  3302141.747 ± 31396.513  us/op
IndexMergeBenchmark.mergeV9              5     false             75000     basic  avgt   25  3115059.181 ± 36265.165  us/op

I. e. it's makes index merging with rollup 3% slower. This is because this PR starts to become really beneficial when garbage-free object metric ser/de is implemented, as mentioned here: #5172, and I wanted to do that before publishing this PR, but it turned out to be a pandora box and it may take several months to complete that work. So I decided to publish this PR even though it doesn't improve performance yet, because this work is already done and i don't want to re-do it when master diverges.

Another reason why this PR doesn't seem so appealing yet, is because the current way of storing data in the incremental index (dims is an array of objects; e. g. double-typed dimension is stored as Double object) doesn't penalize the current way of merging indexes (explained in #4622). I. e. it's just moving Double object from one place to another, no new objects are created.

However, I'm going to replace the current way of storing data in the incremental index as Object[] with some based on ByteBuffers to reduce memory footprint (and finally remove Aggregators, leaving only BufferAggregators that could work in both realtime and historical). When that is done, with the current way of index merging a lot of new objects will need to be created when index merging is started, although no difference for the new index merging structures, presented in this PR.

@leventov
Copy link
Member Author

leventov commented Feb 5, 2018

I tagged Design Review because I want more people to learn about the new logic of index merging via review.

@leventov
Copy link
Member Author

leventov commented Feb 5, 2018

Tagged Bug because before, without dimensions: {"index":100} and events with a string dimension which is empty: {"market":"", "index":100} were not coalesced, however it seems like it was wrong. Those events could be found in druid.sample.json. See changes in SchemalessTestFullTest and SchemalessTestSimpleTest.

*/
EncodedKeyComponentType convertUnsortedEncodedKeyComponentToSortedEncodedKeyComponent(EncodedKeyComponentType key);

ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector selectorWithUnsortedValues);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jon-wei could you please help to create a good javadoc for this method? I forgot why we need to sort dimension values before index merging.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or @gianm?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember either. Definitely they must be sorted after merging (dictionaries need to be sorted) but I forget if there is a specific reason why they need to be sorted before merging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, just saw this comment!

The writeDimValueAndSetupDimConversion step that builds the merged dictionary across indexes and the conversion buffers used for converting per-index dictionary IDs -> merged dictionary IDs builds the conversions from each index's sorted dictionary, so the Rowboat iterable returned by IncrementalIndexAdapter also needs to use the sorted ID space.

DictionaryMergeIterator uses a priority queue to merge the per-index dictionary values, so I guess that's why the sorted IDs are used now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I don't understand why merging is not broken, if it compares IndexedInts (in this PR; or int[], before) just based on sorted values, without looking up the original values. E. g.
Index1:
null -> 0,
apple -> 1,
banana -> 2.

Index2:
null -> 0,
watermelon -> 1.

Then it will merge [watermelon] row from the second index before [banana] row from the first index, because it's sorted index is lower.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeDimValueAndSetupDimConversion would look at the actual String values to create an int buffer for each index to be merged, used as a mapping between sorted index-specific IDs -> sorted merged IDs.

Using that example, the dictionary building step would create merged dictionary:
null -> 0
apple -> 1
banana -> 2
watermelon -3

As part of that process, a conversion buffer for each index to be merged would be generated:
Index 1:
0 -> 0 (null)
1 -> 1 (apple)
2 -> 2 (banana)

Index 2:
0 -> 0 (null)
1 -> 3 (watermelon)

These conversions are applied by MMappedIndexRowIterable before the rows are fed into the row merging function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks. I see now. Updated Javadoc comments.

However now as I clearly see the whole picture, I have another question... #5526

@leventov
Copy link
Member Author

leventov commented Feb 5, 2018

This PR is ready for review. It's going to conflict heavily with #5278 so #5278 should be merged first, but review don't need to wait for that.

@leventov leventov added this to the 0.13.0 milestone Feb 5, 2018
@leventov
Copy link
Member Author

@nishantmonu51 could you please review this?

@gianm
Copy link
Contributor

gianm commented Feb 13, 2018

Another reason why this PR doesn't seem so appealing yet, is because the current way of storing data in the incremental index (dims is an array of objects; e. g. double-typed dimension is stored as Double object) doesn't penalize the current way of merging indexes (explained in #4622). I. e. it's just moving Double object from one place to another, no new objects are created.

However, I'm going to replace the current way of storing data in the incremental index as Object[] with some based on ByteBuffers to reduce memory footprint (and finally remove Aggregators, leaving only BufferAggregators that could work in both realtime and historical). When that is done, with the current way of index merging a lot of new objects will need to be created when index merging is started, although no difference for the new index merging structures, presented in this PR.

@leventov - in this PR you write that it may take several months to complete the series of changes you proposed, but you have also recently written elsewhere that you are "probably going to stop being involved in [Druid] development soon". Forgive me for asking, but are you going to be an active contributor long enough to complete this series of contributions and help with follow up? I ask because right now this area of the code is quite stable, and may become less stable as a result of major changes. Improvements are good, but it's always best if the original author is around to fix bugs, help with problems that may arise, comment on why things were done certain ways, and so on.

For example one potential bug could be a performance bug -- I have done experiments with replacing all uses of Aggregator with BufferAggregator before, and ran into problems with timeseries queries slowing down and growable aggregators in IncrementalIndex taking more memory than expected (I don't remember the exact details, but IIRC, some aggregator in datasketches has a buffer version that allocates the max upfront but the non-buffer versions can start small). If you are using the same approach I did in those experiments then your approach might run into the same problems. But even if not, that's just one example -- there are other potential bugs that could occur too.

@leventov
Copy link
Member Author

leventov commented Feb 13, 2018

@leventov - in this PR you write that it may take several months to complete the series of changes you proposed, but you have also recently written elsewhere that you are "probably going to stop being involved in [Druid] development soon". Forgive me for asking, but are you going to be an active contributor long enough to complete this series of contributions and help with follow up? I ask because right now this area of the code is quite stable, and may become less stable as a result of major changes.

Amount of work is not so big, I envision that it may take several months because for example it will require to propose some changes to https://github.com/DataSketches/memory, wait them merged (with PR review discussions, etc.), then wait until the library with needed changes is released in Maven central. After that (but not in parallel), do the same in https://github.com/RoaringBitmap/RoaringBitmap. After that, some PR(s) in Druid. There might be some delays along the way.

Another reason why it may take several months is that I'm going to reduce my involvement in Druid. But it doesn't take several months of full-time work.

Improvements are good, but it's always best if the original author is around to fix bugs, help with problems that may arise, comment on why things were done certain ways, and so on.

I'm definitely going to be available for such things.


For example one potential bug could be a performance bug -- I have done experiments with replacing all uses of Aggregator with BufferAggregator before, and ran into problems with timeseries queries slowing down and growable aggregators in IncrementalIndex taking more memory than expected (I don't remember the exact details, but IIRC, some aggregator in datasketches has a buffer version that allocates the max upfront but the non-buffer versions can start small). If you are using the same approach I did in those experiments then your approach might run into the same problems. But even if not, that's just one example -- there are other potential bugs that could occur too.

Thanks for this notice, I didn't think about this, but I now see that it could be a problem, if all dimensions and metrics are required to be stored in a single inflexible (because of the shared base) ByteBuffer. Possible solution could be to allow certain Object metrics to be stored as separate (not shared, hence resizable) ByteBuffers, and when a metric Object requires a resize, it just abandons the former heap buffer and allocates a new, bigger one and stores the data in it. API may look like

interface AggregatorFactory {
  /** true return designates that metrics aggregated with this factory need to be stored
      in separate buffers */
  default boolean needsFlexibleAggregate() { return false; }
}

interface BufferAggregator {
  void aggregate(ByteBuffer buf, int position); // this method already exists

  /** If the aggregated object doesn't fit the given buffer (from pos 0 to capacity),
      this method needs to allocate a new one and return from this method */
  default ByteBuffer aggregateFlexible(ByteBuffer buf) {
    aggregate(buf, 0);
    return buf;
  }
}

@nishantmonu51
Copy link
Member

@leventov I am on vacations for this week, will start reviewing this from next week.

@jihoonson
Copy link
Contributor

I will start to review in a few days.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed up to ForwardingRowIterator.

{
return false;
return IndexMergerV9.createDoubleColumnSerializer(segmentWriteOutMedium, dimensionName, indexSpec);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like IndexMergerV9.createDoubleColumnSerializer() can be moved to DoubleDimensionHandler, and it would be probably better because type-specific handling can be done in only that class. Same for other types. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a definite opinion. Your point makes sense, but also it makes sense to keep those methods (for Double, Float and Long) together. I would not change this in this PR, because it's not in the scope of this PR. I didn't put those methods in IndexMergerV9 in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

@fjy
Copy link
Contributor

fjy commented Mar 21, 2018

I strongly think we need to be really careful with a large refactor of the ingestion code that doesn't seem to add much of a performance improvement and carefully test that it doesn't break anything. My vote is to not merge this code unless we are confident about that.

@leventov
Copy link
Member Author

@fjy this PR actually fixes a bug in index merging code.

@leventov
Copy link
Member Author

@fjy I feel that current unit and integration test suite tests index merging well. It wasn't that I wrote some code and it started to pass all tests immediately, far from that. Actually I was surprised how subtle corner cases were revealed (and I had to fix my new code) during unit testing.

This PR doesn't improve performance directly, but it's a part of a grand plan, that will allow to get rid of Aggregator interface, for instance (with the current merging scheme, it won't be possible to do that reasonably efficiently). I explained that in details above in this PR comments and in #5335.

And on top of that, this PR fixes a bug.

* In other words, MergingRowIterator is an equivalent to {@link com.google.common.collect.Iterators#mergeSorted}, but
* for {@link RowIterator}s rather than simple {@link java.util.Iterator}s.
*
* Implementation detail: this class uses binary heap priority queue algorithm to sort pointers, but it also momoizes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/momoizes/memoizes/g

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


/**
* On {@link #moveToNext()} and {@link #mark()}, this class copies all column values into a set of {@link
* SettableColumnValueSelector} instances. Alternative approach was to save only offset in column and use they same
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/they same/the same/g

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self : reviewed till here.

@nishantmonu51
Copy link
Member

Hi @leventov, please summarize the changes in the PR description, especially around the new concepts/interfaces introduced and their purpose to would make the review easy.

Also, can you list down all the future changes that are needed to make this PR useful and get some good results ?

Finally, is this code tested somewhere in production ? If not, can we get branch tested in some test cluster to gain some more confidence here.

@leventov
Copy link
Member Author

leventov commented Apr 9, 2018

please summarize the changes in the PR description, especially around the new concepts/interfaces introduced and their purpose to would make the review easy.

I've added a second paragraph in the first message in this PR. You should basically read javadocs of 4 added classes and interfaces. I don't see a point in copy-pasting Javadoc here.

can you list down all the future changes that are needed to make this PR useful and get some good results ?

There is ObjectStrategy refactoring, see the first message in this thread:

I. e. it's makes index merging with rollup 3% slower. This is because this PR starts to become really beneficial when garbage-free object metric ser/de is implemented, as mentioned here: #5172, and I wanted to do that before publishing this PR, but it turned out to be a pandora box and it may take several months to complete that work. So I decided to publish this PR even though it doesn't improve performance yet, because this work is already done and i don't want to re-do it when master diverges.

Some more info: apache/datasketches-memory#14 (comment). It's already 70% done (for several months already), it awaits for this PR and also for some changes in DataSketches/memory. It will allow to drastically reduce amount of garbage produced when Object columns are involved into index merging (like histogram, dataSketches).

Finally, is this code tested somewhere in production ? If not, can we get branch tested in some test cluster to gain some more confidence here.

No, it's not used in production. I'm not going to test this change in production before it's part of a release, because we moved away from this practice. Also see #5335 (comment)

@leventov
Copy link
Member Author

@nishantmonu51 do you have more comments?

@nishantmonu51
Copy link
Member

LGTM. 👍

@nishantmonu51
Copy link
Member

@fjy: do you still have concerns in merging this PR ?

@fjy
Copy link
Contributor

fjy commented Apr 28, 2018

👍

@jihoonson jihoonson merged commit 9be0007 into apache:master Apr 28, 2018
@leventov
Copy link
Member Author

Thanks for reviews.

@leventov leventov deleted the index-merge-no-garbage branch April 29, 2018 13:34
sathishsri88 pushed a commit to sathishs/druid that referenced this pull request May 8, 2018
…nters (apache#5335)

* Refactor index merging, replace Rowboats with RowIterators and RowPointers

* Add javadocs

* Fix a bug in QueryableIndexIndexableAdapter

* Fixes

* Remove unused declarations

* Remove unused GenericColumn.isNull() method

* Fix test

* Address comments

* Rearrange some code in MergingRowIterator for more clarity

* Self-review

* Fix style

* Improve docs

* Fix docs

* Rename IndexMergerV9.writeDimValueAndSetupDimConversion to setUpDimConversion()

* Update Javadocs

* Minor fixes

* Doc fixes, more code comments, cleanup of RowCombiningTimeAndDimsIterator

* Fix doc link
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants