Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor index merging, replace Rowboats with RowIterators and RowPointers #5335

Merged
merged 27 commits into from
Apr 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3e252e9
Refactor index merging, replace Rowboats with RowIterators and RowPoi…
leventov Feb 2, 2018
86e1c09
Add javadocs
leventov Feb 3, 2018
28ba4b5
Fix a bug in QueryableIndexIndexableAdapter
leventov Feb 3, 2018
6e66482
Fixes
leventov Feb 3, 2018
590b110
Remove unused declarations
leventov Feb 5, 2018
eb158ca
Merge remote-tracking branch 'upstream/master' into index-merge-no-ga…
leventov Feb 9, 2018
f51ffde
Merge remote-tracking branch 'upstream/master' into index-merge-no-ga…
leventov Feb 21, 2018
77c02fc
Remove unused GenericColumn.isNull() method
leventov Feb 21, 2018
f7e9a26
Merge branch 'master' of github.com:druid-io/druid into index-merge-n…
leventov Feb 27, 2018
111cb86
Merge remote-tracking branch 'upstream/master' into index-merge-no-ga…
leventov Mar 1, 2018
625b7a2
Fix test
leventov Mar 1, 2018
a94f874
Address comments
leventov Mar 1, 2018
ec894f6
Rearrange some code in MergingRowIterator for more clarity
leventov Mar 2, 2018
913de3a
Self-review
leventov Mar 2, 2018
23d9a83
Fix style
leventov Mar 2, 2018
c8ae34a
Improve docs
leventov Mar 9, 2018
5b667ce
Merge remote-tracking branch 'upstream/master' into index-merge-no-ga…
leventov Mar 13, 2018
3b9670a
Merge remote-tracking branch 'upstream/master' into index-merge-no-ga…
leventov Mar 19, 2018
ad2c2f5
Merge remote-tracking branch 'upstream/master' into index-merge-no-ga…
leventov Mar 21, 2018
ac448b4
Fix docs
leventov Mar 21, 2018
70c1c98
Rename IndexMergerV9.writeDimValueAndSetupDimConversion to setUpDimCo…
leventov Mar 21, 2018
e1434fc
Update Javadocs
leventov Mar 23, 2018
56b2a5f
Merge remote-tracking branch 'upstream/master' into index-merge-no-ga…
leventov Apr 6, 2018
28ee32b
Merge remote-tracking branch 'upstream/master' into index-merge-no-ga…
leventov Apr 9, 2018
15d1c35
Minor fixes
leventov Apr 9, 2018
cfec46a
Doc fixes, more code comments, cleanup of RowCombiningTimeAndDimsIter…
leventov Apr 9, 2018
99c7e8a
Fix doc link
leventov Apr 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/java/io/druid/data/input/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public interface Row extends Comparable<Row>

/**
* Returns the raw dimension value for the given column name. This is different from {@link #getDimension} which
* all values to strings before returning them.
* converts all values to strings before returning them.
*
* @param dimension the column name of the dimension requested
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package io.druid.benchmark;

// Run FloatCompressionBenchmarkFileGenerator to generate the required files before running this benchmark

import com.google.common.base.Supplier;
import com.google.common.io.Files;
import io.druid.segment.data.ColumnarFloats;
Expand All @@ -44,6 +42,9 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* Run {@link FloatCompressionBenchmarkFileGenerator} to generate the required files before running this benchmark
*/
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;

// Run LongCompressionBenchmarkFileGenerator to generate the required files before running this benchmark

/**
* Run {@link LongCompressionBenchmarkFileGenerator} to generate the required files before running this benchmark
*/
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexIndexableAdapter;
import io.druid.segment.Rowboat;
import io.druid.segment.RowIterator;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -309,11 +309,11 @@ private void verifyJob(IndexGeneratorJob job) throws IOException
QueryableIndex index = HadoopDruidIndexerConfig.INDEX_IO.loadIndex(dir);
QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);

for (Rowboat row : adapter.getRows()) {
Object[] metrics = row.getMetrics();

rowCount++;
Assert.assertTrue(metrics.length == 2);
try (RowIterator rowIt = adapter.getRows()) {
while (rowIt.moveToNext()) {
rowCount++;
Assert.assertEquals(2, rowIt.getPointer().getNumMetrics());
}
}
}
Assert.assertEquals(rowCount, data.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,7 @@ public void buildingSketchesAtIngestionTime() throws Exception
double[] histogram = (double[]) histogramObject;
Assert.assertEquals(4, histogram.length);
for (final double bin : histogram) {
Assert.assertEquals(100, bin, 100 * 0.2); // 400 items uniformly
// distributed into 4 bins
Assert.assertEquals(100, bin, 100 * 0.2); // 400 items uniformly distributed into 4 bins
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,7 @@ public static final SerializeResult toBytes(
}

//writing all metrics
Supplier<InputRow> supplier = new Supplier<InputRow>()
{
@Override
public InputRow get()
{
return row;
}
};
Supplier<InputRow> supplier = () -> row;
WritableUtils.writeVInt(out, aggs.length);
for (AggregatorFactory aggFactory : aggs) {
String k = aggFactory.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.indexing.common.TaskToolbox;
Expand All @@ -35,8 +34,8 @@
import io.druid.segment.IndexSpec;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.QueryableIndexIndexableAdapter;
import io.druid.segment.Rowboat;
import io.druid.segment.RowboatFilteringIndexAdapter;
import io.druid.segment.RowFilteringIndexAdapter;
import io.druid.segment.RowPointer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
Expand Down Expand Up @@ -120,18 +119,9 @@ public SegmentToMergeHolder apply(PartitionChunk<DataSegment> chunkInput)
List<IndexableAdapter> adapters = Lists.newArrayList();
for (final SegmentToMergeHolder holder : segmentsToMerge) {
adapters.add(
new RowboatFilteringIndexAdapter(
new QueryableIndexIndexableAdapter(
toolbox.getIndexIO().loadIndex(holder.getFile())
),
new Predicate<Rowboat>()
{
@Override
public boolean apply(Rowboat input)
{
return holder.getInterval().contains(input.getTimestamp());
}
}
new RowFilteringIndexAdapter(
new QueryableIndexIndexableAdapter(toolbox.getIndexIO().loadIndex(holder.getFile())),
(RowPointer rowPointer) -> holder.getInterval().contains(rowPointer.getTimestamp())
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public abstract class AggregatorFactory implements Cacheable
* @see AggregateCombiner
* @see io.druid.segment.IndexMerger
*/
@SuppressWarnings("unused") // Going to be used when https://github.com/druid-io/druid/projects/2 is complete
public AggregateCombiner makeAggregateCombiner()
{
throw new UOE("[%s] does not implement makeAggregateCombiner()", this.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
import javax.annotation.Nullable;

/**
* An ExtractionFn is a function that can be used to transform the values of a column (typically a dimension).
* Note that ExtractionFn implementations are expected to be Threadsafe.
*
* A simple example of the type of operation this enables is the RegexDimExtractionFn which applies a
* regular expression with a capture group. When the regular expression matches the value of a dimension,
* the value captured by the group is used for grouping operations instead of the dimension value.
*/
@ExtensionPoint
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
Expand All @@ -48,14 +54,6 @@
@JsonSubTypes.Type(name = "bucket", value = BucketExtractionFn.class),
@JsonSubTypes.Type(name = "strlen", value = StrlenExtractionFn.class)
})
/**
* An ExtractionFn is a function that can be used to transform the values of a column (typically a dimension).
* Note that ExtractionFn implementations are expected to be Threadsafe.
*
* A simple example of the type of operation this enables is the RegexDimExtractionFn which applies a
* regular expression with a capture group. When the regular expression matches the value of a dimension,
* the value captured by the group is used for grouping operations instead of the dimension value.
*/
public interface ExtractionFn extends Cacheable
{
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public VirtualColumns getVirtualColumns()
return virtualColumns;
}

@Nullable
@JsonProperty("filter")
public DimFilter getDimFilter()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@
public interface ColumnValueSelector<T> extends BaseLongColumnValueSelector, BaseDoubleColumnValueSelector,
BaseFloatColumnValueSelector, BaseObjectColumnValueSelector<T>
{
ColumnValueSelector[] EMPTY_ARRAY = new ColumnValueSelector[0];
}
9 changes: 8 additions & 1 deletion processing/src/main/java/io/druid/segment/Cursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,15 @@
import org.joda.time.DateTime;

/**
* Cursor is an interface for iteration over a range of data points, used during query execution. {@link
* QueryableIndexStorageAdapter.QueryableIndexCursor} is an implementation for historical segments, and {@link
* io.druid.segment.incremental.IncrementalIndexStorageAdapter.IncrementalIndexCursor} is an implementation for {@link
* io.druid.segment.incremental.IncrementalIndex}.
*
* Cursor is conceptually similar to {@link TimeAndDimsPointer}, but the latter is used for historical segment creation
* rather than query execution (as Cursor). If those abstractions could be collapsed (and if it is worthwhile) is yet to
* be determined.
*/

public interface Cursor
{
ColumnSelectorFactory getColumnSelectorFactory();
Expand Down
87 changes: 13 additions & 74 deletions processing/src/main/java/io/druid/segment/DimensionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
package io.druid.segment;

import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import io.druid.segment.selector.settable.SettableColumnValueSelector;
import io.druid.segment.writeout.SegmentWriteOutMedium;

import java.io.Closeable;
import java.util.Comparator;

/**
* Processing related interface
Expand All @@ -40,10 +39,8 @@
*
* This interface allows type-specific behavior column logic, such as choice of indexing structures and disk formats.
* to be contained within a type-specific set of handler objects, simplifying processing classes
* such as IncrementalIndex and IndexMerger and allowing for abstracted development of additional dimension types.
*
* A dimension may have two representations, an encoded representation and a actual representation.
* For example, a value for a String dimension has an integer dictionary encoding, and an actual String representation.
* such as {@link io.druid.segment.incremental.IncrementalIndex} and {@link IndexMerger} and allowing for abstracted
* development of additional dimension types.
*
* A DimensionHandler is a stateless object, and thus thread-safe; its methods should be pure functions.
*
Expand Down Expand Up @@ -85,7 +82,6 @@ default MultiValueHandling getMultivalueHandling()
*/
DimensionIndexer<EncodedType, EncodedKeyComponentType, ActualType> makeIndexer();


/**
* Creates a new DimensionMergerV9, a per-dimension object responsible for merging indexes/row data across segments
* and building the on-disk representation of a dimension. For use with IndexMergerV9 only.
Expand All @@ -99,14 +95,13 @@ default MultiValueHandling getMultivalueHandling()

* @return A new DimensionMergerV9 object.
*/
DimensionMergerV9<EncodedKeyComponentType> makeMerger(
DimensionMergerV9 makeMerger(
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress
);


/**
* Given an key component representing a single set of row value(s) for this dimension as an Object,
* return the length of the key component after appropriate type-casting.
Expand All @@ -119,73 +114,17 @@ DimensionMergerV9<EncodedKeyComponentType> makeMerger(
*/
int getLengthOfEncodedKeyComponent(EncodedKeyComponentType dimVals);


/**
* Given two key components representing sorted encoded row value(s), return the result of their comparison.
*
* If the two key components have different lengths, the shorter component should be ordered first in the comparison.
*
* Otherwise, this function should iterate through the key components and return the comparison of the
* first difference.
*
* For dimensions that do not support multivalue rows, lhs and rhs can be compared directly.
*
* @param lhs key component from a row
* @param rhs key component from a row
*
* @return integer indicating comparison result of key components
*/
int compareSortedEncodedKeyComponents(EncodedKeyComponentType lhs, EncodedKeyComponentType rhs);


/**
* Given two key components representing sorted encoded row value(s), check that the two key components
* have the same encoded values, or if the encoded values differ, that they translate into the same actual values,
* using the mappings provided by lhsEncodings and rhsEncodings (if applicable).
*
* If validation fails, this method should throw a SegmentValidationException.
*
* Used by IndexIO for validating segments.
*
* See StringDimensionHandler.validateSortedEncodedKeyComponents() for a reference implementation.
*
* @param lhs key component from a row
* @param rhs key component from a row
* @param lhsEncodings encoding lookup from lhs's segment, null if not applicable for this dimension's type
* @param rhsEncodings encoding lookup from rhs's segment, null if not applicable for this dimension's type
* Returns a comparator that knows how to compare {@link ColumnValueSelector} of the assumed dimension type,
* corresponding to this DimensionHandler. E. g. {@link StringDimensionHandler} returns a comparator, that compares
* {@link ColumnValueSelector}s as {@link DimensionSelector}s.
*/
void validateSortedEncodedKeyComponents(
EncodedKeyComponentType lhs,
EncodedKeyComponentType rhs,
Indexed<ActualType> lhsEncodings,
Indexed<ActualType> rhsEncodings
) throws SegmentValidationException;

Comparator<ColumnValueSelector> getEncodedValueSelectorComparator();

/**
* Given a Column, return a type-specific object that can be used to retrieve row values.
*
* For example:
* - A String-typed implementation would return the result of column.getDictionaryEncoding()
* - A long-typed implemention would return the result of column.getGenericColumn().
*
* @param column Column for this dimension from a QueryableIndex
* @return The type-specific column subobject for this dimension.
*/
Closeable getSubColumn(Column column);


/**
* Given a subcolumn from getSubColumn, and the index of the current row, retrieve a dimension's values
* from a row as an EncodedKeyComponentType.
*
* For example:
* - A String-typed implementation would read the current row from a DictionaryEncodedColumn as an int[].
* - A long-typed implemention would read the current row from a GenericColumn and return a Long.
*
* @param column Column for this dimension from a QueryableIndex
* @param currRow The index of the row to retrieve
* @return The key component for this dimension from the current row of the column.
* Creates and returns a new object of some implementation of {@link SettableColumnValueSelector}, that corresponds
* to the type of this DimensionHandler. E. g. {@link LongDimensionHandler} returns {@link
* io.druid.segment.selector.settable.SettableLongColumnValueSelector}, etc.
*/
EncodedKeyComponentType getEncodedKeyComponentFromColumn(Closeable column, int currRow);
SettableColumnValueSelector makeNewSettableEncodedValueSelector();
}
Loading