Skip to content

Commit

Permalink
clean startree test cases with hll
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaqi-g committed Aug 29, 2016
1 parent d37c378 commit 80c949d
Show file tree
Hide file tree
Showing 8 changed files with 511 additions and 174 deletions.
Expand Up @@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.common.utils.request;

import com.google.common.collect.ImmutableSet;
import com.linkedin.pinot.common.request.GroupBy;
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.common.segment.StarTreeMetadata;
Expand Down Expand Up @@ -102,11 +103,13 @@ public static FilterQueryTree buildFilterQuery(Integer id, Map<Integer, FilterQu
return q2;
}

public static final Set<String> ALLOWED_AGGREGATION_FUNCTIONS = ImmutableSet.of("sum", "fasthll");

/**
* Returns true for the following, false otherwise:
* - Query is not aggregation/group-by
* - Segment does not contain star tree
* - The only aggregation function in the query is 'sum'
* - The only aggregation function in the query should be in {@link #ALLOWED_AGGREGATION_FUNCTIONS}
* - All group by columns and predicate columns are materialized
* - Predicates do not contain any metric columns
* - Query consists only of simple predicates, conjoined by AND.
Expand Down Expand Up @@ -147,9 +150,10 @@ public static boolean isFitForStarTreeIndex(SegmentMetadata segmentMetadata, Fil
}
}

// We currently support only sum
// We currently support only limited aggregations
for (AggregationInfo aggregationInfo : aggregationsInfo) {
if (!aggregationInfo.getAggregationType().equalsIgnoreCase("sum")) {
String aggregationFunctionName = aggregationInfo.getAggregationType().toLowerCase();
if (!ALLOWED_AGGREGATION_FUNCTIONS.contains(aggregationFunctionName)) {
return false;
}
}
Expand Down
Expand Up @@ -15,14 +15,8 @@
*/
package com.linkedin.pinot.core.startree;

import com.linkedin.pinot.common.data.DimensionFieldSpec;
import com.linkedin.pinot.common.data.FieldSpec;
import com.linkedin.pinot.common.data.MetricFieldSpec;
import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.data.StarTreeIndexSpec;
import com.linkedin.pinot.common.data.TimeFieldSpec;
import com.linkedin.pinot.common.request.BrokerRequest;
import com.linkedin.pinot.common.segment.ReadMode;
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.common.utils.request.FilterQueryTree;
import com.linkedin.pinot.common.utils.request.RequestUtils;
Expand All @@ -31,27 +25,15 @@
import com.linkedin.pinot.core.common.Constants;
import com.linkedin.pinot.core.common.DataSource;
import com.linkedin.pinot.core.common.Operator;
import com.linkedin.pinot.core.data.GenericRow;
import com.linkedin.pinot.core.data.readers.FileFormat;
import com.linkedin.pinot.core.data.readers.RecordReader;
import com.linkedin.pinot.core.indexsegment.IndexSegment;
import com.linkedin.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import com.linkedin.pinot.core.operator.filter.StarTreeIndexOperator;
import com.linkedin.pinot.core.plan.FilterPlanNode;
import com.linkedin.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import com.linkedin.pinot.core.segment.index.loader.Loaders;
import com.linkedin.pinot.core.segment.index.readers.Dictionary;
import com.linkedin.pinot.pql.parsers.Pql2Compiler;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.commons.math.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -60,16 +42,11 @@
/**
* Base class containing common functionality for all star-tree integration tests.
*/
public class BaseStarTreeIndexTest {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseStarTreeIndexTest.class);
public class BaseSumStarTreeIndexTest {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseSumStarTreeIndexTest.class);

private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
private static final int NUM_DIMENSIONS = 4;
private static final int NUM_METRICS = 2;
private static final int METRIC_MAX_VALUE = 10000;
protected final long _randomSeed = System.nanoTime();


protected String[] _hardCodedQueries =
new String[]{
"select sum(m1) from T",
Expand Down Expand Up @@ -124,7 +101,7 @@ protected void testHardCodedQueries(IndexSegment segment, Schema schema) {
* @param metricNames
* @param brokerRequest
*/
protected Map<String, double[]> computeSumUsingRawDocs(IndexSegment segment, List<String> metricNames,
private Map<String, double[]> computeSumUsingRawDocs(IndexSegment segment, List<String> metricNames,
BrokerRequest brokerRequest) {
FilterPlanNode planNode = new FilterPlanNode(segment, brokerRequest);
Operator rawOperator = planNode.run();
Expand All @@ -143,7 +120,7 @@ protected Map<String, double[]> computeSumUsingRawDocs(IndexSegment segment, Lis
* @param brokerRequest
* @return
*/
Map<String, double[]> computeSumUsingAggregatedDocs(IndexSegment segment, List<String> metricNames,
private Map<String, double[]> computeSumUsingAggregatedDocs(IndexSegment segment, List<String> metricNames,
BrokerRequest brokerRequest) {
StarTreeIndexOperator starTreeOperator = new StarTreeIndexOperator(segment, brokerRequest);
starTreeOperator.open();
Expand All @@ -157,95 +134,6 @@ Map<String, double[]> computeSumUsingAggregatedDocs(IndexSegment segment, List<S
return computeSum(segment, starTreeDocIdIterator, metricNames, groupByColumns);
}

/**
* Helper method to build the segment.
*
* @param segmentDirName
* @param segmentName
* @throws Exception
*/
Schema buildSegment(String segmentDirName, String segmentName, boolean enableOffHeapFormat)
throws Exception {
int ROWS = (int) MathUtils.factorial(NUM_DIMENSIONS);
Schema schema = new Schema();

for (int i = 0; i < NUM_DIMENSIONS; i++) {
String dimName = "d" + (i + 1);
DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec(dimName, FieldSpec.DataType.STRING, true);
schema.addField(dimensionFieldSpec);
}

schema.setTimeFieldSpec(new TimeFieldSpec(TIME_COLUMN_NAME, FieldSpec.DataType.INT, TimeUnit.DAYS));
for (int i = 0; i < NUM_METRICS; i++) {
String metricName = "m" + (i + 1);
MetricFieldSpec metricFieldSpec = new MetricFieldSpec(metricName, FieldSpec.DataType.INT);
schema.addField(metricFieldSpec);
}

SegmentGeneratorConfig config = new SegmentGeneratorConfig(schema);
config.setEnableStarTreeIndex(true);
config.setOutDir(segmentDirName);
config.setFormat(FileFormat.AVRO);
config.setSegmentName(segmentName);
config.setStarTreeIndexSpec(buildStarTreeIndexSpec(enableOffHeapFormat));

final List<GenericRow> data = new ArrayList<>();
for (int row = 0; row < ROWS; row++) {
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < NUM_DIMENSIONS; i++) {
String dimName = schema.getDimensionFieldSpecs().get(i).getName();
map.put(dimName, dimName + "-v" + row % (NUM_DIMENSIONS - i));
}

Random random = new Random(_randomSeed);
for (int i = 0; i < NUM_METRICS; i++) {
String metName = schema.getMetricFieldSpecs().get(i).getName();
map.put(metName, random.nextInt(METRIC_MAX_VALUE));
}

// Time column.
map.put("daysSinceEpoch", row % 7);

GenericRow genericRow = new GenericRow();
genericRow.init(map);
data.add(genericRow);
}

SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
RecordReader reader = createReader(schema, data);
driver.init(config, reader);
driver.build();

LOGGER.info("Built segment {} at {}", segmentName, segmentDirName);
return schema;
}

/**
* Builds a star tree index spec for the test.
* - Use MaxLeafRecords as 1 to stress test.
* @return
* @param enableOffHeapFormat
*/
private StarTreeIndexSpec buildStarTreeIndexSpec(boolean enableOffHeapFormat) {
StarTreeIndexSpec spec = new StarTreeIndexSpec();
spec.setMaxLeafRecords(1);
spec.setEnableOffHeapFormat(enableOffHeapFormat);
return spec;
}

/**
* Helper method to load the segment.
*
* @param segmentDirName
* @param segmentName
* @throws Exception
*/
IndexSegment loadSegment(String segmentDirName, String segmentName)
throws Exception {
LOGGER.info("Loading segment {}", segmentName);
return Loaders.IndexSegment.load(new File(segmentDirName, segmentName), ReadMode.heap);
}

/**
* Compute 'sum' for a given list of metrics, by scanning the given set of doc-ids.
*
Expand Down Expand Up @@ -305,49 +193,4 @@ private Map<String, double[]> computeSum(IndexSegment segment, BlockDocIdIterato

return result;
}

private RecordReader createReader(final Schema schema, final List<GenericRow> data) {
return new RecordReader() {

int counter = 0;

@Override
public void rewind()
throws Exception {
counter = 0;
}

@Override
public GenericRow next() {
return data.get(counter++);
}

@Override
public void init()
throws Exception {

}

@Override
public boolean hasNext() {
return counter < data.size();
}

@Override
public Schema getSchema() {
return schema;
}

@Override
public Map<String, MutableLong> getNullCountMap() {
return null;
}

@Override
public void close()
throws Exception {

}
};
}
}

0 comments on commit 80c949d

Please sign in to comment.