Skip to content

Commit

Permalink
Merge pull request #1739 from jon-wei/segment_realtime
Browse files Browse the repository at this point in the history
Allow SegmentAnalyzer to read columns from StorageAdapter, allow SegmentMetadataQuery to query IncrementalIndexSegments on realtime node
  • Loading branch information
gianm committed Sep 18, 2015
2 parents ea2a773 + 367c50d commit aaa8a88
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import com.metamx.common.logger.Logger;
import com.metamx.common.StringUtils;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.segment.QueryableIndex;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class SegmentAnalyzer
Expand Down Expand Up @@ -61,7 +67,7 @@ public Map<String, ColumnAnalysis> analyze(QueryableIndex index)

final ColumnAnalysis analysis;
final ValueType type = capabilities.getType();
switch(type) {
switch (type) {
case LONG:
analysis = analyzeLongColumn(column);
break;
Expand All @@ -82,7 +88,55 @@ public Map<String, ColumnAnalysis> analyze(QueryableIndex index)
columns.put(columnName, analysis);
}

columns.put(Column.TIME_COLUMN_NAME, lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP));
columns.put(
Column.TIME_COLUMN_NAME,
lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP)
);

return columns;
}

public Map<String, ColumnAnalysis> analyze(StorageAdapter adapter)
{
Preconditions.checkNotNull(adapter, "Adapter cannot be null");
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
List<String> columnNames = getStorageAdapterColumnNames(adapter);

int numRows = adapter.getNumRows();
for (String columnName : columnNames) {
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(columnName);
final ColumnAnalysis analysis;

/**
* StorageAdapter doesn't provide a way to get column values, so size is
* not calculated for STRING and COMPLEX columns.
*/
ValueType capType = capabilities.getType();
switch (capType) {
case LONG:
analysis = lengthBasedAnalysisForAdapter(capType.name(), capabilities, numRows, Longs.BYTES);
break;
case FLOAT:
analysis = lengthBasedAnalysisForAdapter(capType.name(), capabilities, numRows, NUM_BYTES_IN_TEXT_FLOAT);
break;
case STRING:
analysis = new ColumnAnalysis(capType.name(), 0, adapter.getDimensionCardinality(columnName), null);
break;
case COMPLEX:
analysis = new ColumnAnalysis(capType.name(), 0, null, null);
break;
default:
log.warn("Unknown column type[%s].", capType);
analysis = ColumnAnalysis.error(String.format("unknown_type_%s", capType));
}

columns.put(columnName, analysis);
}

columns.put(
Column.TIME_COLUMN_NAME,
lengthBasedAnalysisForAdapter(ValueType.LONG.name(), null, numRows, NUM_BYTES_IN_TIMESTAMP)
);

return columns;
}
Expand Down Expand Up @@ -154,4 +208,26 @@ public ColumnAnalysis analyzeComplexColumn(Column column)

return new ColumnAnalysis(typeName, size, null, null);
}

private List<String> getStorageAdapterColumnNames(StorageAdapter adapter)
{
Indexed<String> dims = adapter.getAvailableDimensions();
Iterable<String> metrics = adapter.getAvailableMetrics();
Iterable<String> columnNames = Iterables.concat(dims, metrics);
List<String> sortedColumnNames = Lists.newArrayList(columnNames);
Collections.sort(sortedColumnNames);
return sortedColumnNames;
}

private ColumnAnalysis lengthBasedAnalysisForAdapter(
String type, ColumnCapabilities capabilities,
int numRows, final int numBytes
)
{
if (capabilities != null && capabilities.hasMultipleValues()) {
return ColumnAnalysis.error("multi_value");
}
return new ColumnAnalysis(type, numRows * numBytes, null, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
return arg1;
}

if (!query.isMerge()) {
throw new ISE("Merging when a merge isn't supposed to happen[%s], [%s]", arg1, arg2);
}

List<Interval> newIntervals = JodaUtils.condenseIntervals(
Iterables.concat(arg1.getIntervals(), arg2.getIntervals())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,17 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;

final QueryableIndex index = segment.asQueryableIndex();
final Map<String, ColumnAnalysis> analyzedColumns;
long totalSize = 0;
if (index == null) {
return Sequences.empty();
// IncrementalIndexSegments (used by in-memory hydrants in the realtime service) do not have a QueryableIndex
analyzedColumns = analyzer.analyze(segment.asStorageAdapter());
} else {
analyzedColumns = analyzer.analyze(index);
// Initialize with the size of the whitespace, 1 byte per
totalSize = analyzedColumns.size() * index.getNumRows();
}

final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(index);

// Initialize with the size of the whitespace, 1 byte per
long totalSize = analyzedColumns.size() * index.getNumRows();

Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
ColumnIncluderator includerator = query.getToInclude();
for (Map.Entry<String, ColumnAnalysis> entry : analyzedColumns.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public int getDimensionCardinality(String dimension)
return column.getDictionaryEncoding().getCardinality();
}

@Override
public int getNumRows()
{
return index.getNumRows();
}

@Override
public DateTime getMinTime()
{
Expand Down Expand Up @@ -136,6 +142,12 @@ public Capabilities getCapabilities()
return Capabilities.builder().dimensionValuesSorted(true).build();
}

@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return index.getColumn(column).getCapabilities();
}

@Override
public DateTime getMaxIngestedEventTime()
{
Expand Down Expand Up @@ -275,7 +287,10 @@ public void reset()
}

@Override
public DimensionSelector makeDimensionSelector(final String dimension, @Nullable final ExtractionFn extractionFn)
public DimensionSelector makeDimensionSelector(
final String dimension,
@Nullable final ExtractionFn extractionFn
)
{
final Column columnDesc = index.getColumn(dimension);
if (columnDesc == null) {
Expand All @@ -296,8 +311,7 @@ public DimensionSelector makeDimensionSelector(final String dimension, @Nullable

if (column == null) {
return NULL_DIMENSION_SELECTOR;
}
else if (columnDesc.getCapabilities().hasMultipleValues()) {
} else if (columnDesc.getCapabilities().hasMultipleValues()) {
return new DimensionSelector()
{
@Override
Expand Down Expand Up @@ -325,7 +339,9 @@ public String lookupName(int id)
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return column.lookupId(name);
}
Expand Down Expand Up @@ -388,7 +404,9 @@ public String lookupName(int id)
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return column.lookupId(name);
}
Expand Down
3 changes: 3 additions & 0 deletions processing/src/main/java/io/druid/segment/StorageAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.druid.segment;

import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand All @@ -42,5 +43,7 @@ public interface StorageAdapter extends CursorFactory
public DateTime getMinTime();
public DateTime getMaxTime();
public Capabilities getCapabilities();
public ColumnCapabilities getColumnCapabilities(String column);
public int getNumRows();
public DateTime getMaxIngestedEventTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.druid.segment.SingleScanTimeDimSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ListIndexed;
Expand Down Expand Up @@ -102,7 +103,7 @@ public Iterable<String> getAvailableMetrics()
@Override
public int getDimensionCardinality(String dimension)
{
if(dimension.equals(Column.TIME_COLUMN_NAME)) {
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return Integer.MAX_VALUE;
}
IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
Expand All @@ -112,6 +113,12 @@ public int getDimensionCardinality(String dimension)
return dimDim.size();
}

@Override
public int getNumRows()
{
return index.size();
}

@Override
public DateTime getMinTime()
{
Expand All @@ -130,6 +137,12 @@ public Capabilities getCapabilities()
return Capabilities.builder().dimensionValuesSorted(false).build();
}

@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return index.getCapabilities(column);
}

@Override
public DateTime getMaxIngestedEventTime()
{
Expand Down Expand Up @@ -278,7 +291,10 @@ public void reset()
}

@Override
public DimensionSelector makeDimensionSelector(final String dimension, @Nullable final ExtractionFn extractionFn)
public DimensionSelector makeDimensionSelector(
final String dimension,
@Nullable final ExtractionFn extractionFn
)
{
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn);
Expand Down Expand Up @@ -310,7 +326,7 @@ public IndexedInts getRow()
}
}
// check for null entry
if(vals.isEmpty() && dimValLookup.contains(null)){
if (vals.isEmpty() && dimValLookup.contains(null)) {
int id = dimValLookup.getId(null);
if (id < maxId) {
vals.add(id);
Expand Down Expand Up @@ -369,7 +385,9 @@ public String lookupName(int id)
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return dimValLookup.getId(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,38 @@
public class SegmentAnalyzerTest
{
@Test
public void testIncrementalDoesNotWork() throws Exception
public void testIncrementalWorks() throws Exception
{
final List<SegmentAnalysis> results = getSegmentAnalysises(
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), null)
);

Assert.assertEquals(0, results.size());
Assert.assertEquals(1, results.size());

final SegmentAnalysis analysis = results.get(0);
Assert.assertEquals(null, analysis.getId());

final Map<String, ColumnAnalysis> columns = analysis.getColumns();

Assert.assertEquals(
TestIndex.COLUMNS.length,
columns.size()
); // All columns including time and empty/null column

for (String dimension : TestIndex.DIMENSIONS) {
final ColumnAnalysis columnAnalysis = columns.get(dimension);

Assert.assertEquals(dimension, ValueType.STRING.name(), columnAnalysis.getType());
Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0);
}

for (String metric : TestIndex.METRICS) {
final ColumnAnalysis columnAnalysis = columns.get(metric);

Assert.assertEquals(metric, ValueType.FLOAT.name(), columnAnalysis.getType());
Assert.assertTrue(metric, columnAnalysis.getSize() > 0);
Assert.assertNull(metric, columnAnalysis.getCardinality());
}
}

@Test
Expand All @@ -66,7 +91,10 @@ public void testMappedWorks() throws Exception
Assert.assertEquals("test_1", analysis.getId());

final Map<String, ColumnAnalysis> columns = analysis.getColumns();
Assert.assertEquals(TestIndex.COLUMNS.length -1, columns.size()); // All columns including time and excluding empty/null column
Assert.assertEquals(
TestIndex.COLUMNS.length - 1,
columns.size()
); // All columns including time and excluding empty/null column

for (String dimension : TestIndex.DIMENSIONS) {
final ColumnAnalysis columnAnalysis = columns.get(dimension);
Expand Down Expand Up @@ -107,7 +135,7 @@ private List<SegmentAnalysis> getSegmentAnalysises(Segment index)
final SegmentMetadataQuery query = new SegmentMetadataQuery(
new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null, false
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
return Sequences.toList(query.run(runner, context), Lists.<SegmentAnalysis>newArrayList());
}
}

0 comments on commit aaa8a88

Please sign in to comment.