Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean startree test cases with hll #429

Merged
merged 1 commit into from Aug 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.common.utils.request;

import com.google.common.collect.ImmutableSet;
import com.linkedin.pinot.common.request.GroupBy;
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.common.segment.StarTreeMetadata;
Expand Down Expand Up @@ -102,11 +103,13 @@ public static FilterQueryTree buildFilterQuery(Integer id, Map<Integer, FilterQu
return q2;
}

public static final Set<String> ALLOWED_AGGREGATION_FUNCTIONS = ImmutableSet.of("sum", "fasthll");

/**
* Returns true for the following, false otherwise:
* - Query is not aggregation/group-by
* - Segment does not contain star tree
* - The only aggregation function in the query is 'sum'
* - The only aggregation function in the query should be in {@link #ALLOWED_AGGREGATION_FUNCTIONS}
* - All group by columns and predicate columns are materialized
* - Predicates do not contain any metric columns
* - Query consists only of simple predicates, conjoined by AND.
Expand Down Expand Up @@ -147,9 +150,10 @@ public static boolean isFitForStarTreeIndex(SegmentMetadata segmentMetadata, Fil
}
}

// We currently support only sum
// We currently support only limited aggregations
for (AggregationInfo aggregationInfo : aggregationsInfo) {
if (!aggregationInfo.getAggregationType().equalsIgnoreCase("sum")) {
String aggregationFunctionName = aggregationInfo.getAggregationType().toLowerCase();
if (!ALLOWED_AGGREGATION_FUNCTIONS.contains(aggregationFunctionName)) {
return false;
}
}
Expand Down
Expand Up @@ -15,14 +15,8 @@
*/
package com.linkedin.pinot.core.startree;

import com.linkedin.pinot.common.data.DimensionFieldSpec;
import com.linkedin.pinot.common.data.FieldSpec;
import com.linkedin.pinot.common.data.MetricFieldSpec;
import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.data.StarTreeIndexSpec;
import com.linkedin.pinot.common.data.TimeFieldSpec;
import com.linkedin.pinot.common.request.BrokerRequest;
import com.linkedin.pinot.common.segment.ReadMode;
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.common.utils.request.FilterQueryTree;
import com.linkedin.pinot.common.utils.request.RequestUtils;
Expand All @@ -31,27 +25,15 @@
import com.linkedin.pinot.core.common.Constants;
import com.linkedin.pinot.core.common.DataSource;
import com.linkedin.pinot.core.common.Operator;
import com.linkedin.pinot.core.data.GenericRow;
import com.linkedin.pinot.core.data.readers.FileFormat;
import com.linkedin.pinot.core.data.readers.RecordReader;
import com.linkedin.pinot.core.indexsegment.IndexSegment;
import com.linkedin.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import com.linkedin.pinot.core.operator.filter.StarTreeIndexOperator;
import com.linkedin.pinot.core.plan.FilterPlanNode;
import com.linkedin.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import com.linkedin.pinot.core.segment.index.loader.Loaders;
import com.linkedin.pinot.core.segment.index.readers.Dictionary;
import com.linkedin.pinot.pql.parsers.Pql2Compiler;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.commons.math.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -60,16 +42,11 @@
/**
* Base class containing common functionality for all star-tree integration tests.
*/
public class BaseStarTreeIndexTest {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseStarTreeIndexTest.class);
public class BaseSumStarTreeIndexTest {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseSumStarTreeIndexTest.class);

private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
private static final int NUM_DIMENSIONS = 4;
private static final int NUM_METRICS = 2;
private static final int METRIC_MAX_VALUE = 10000;
protected final long _randomSeed = System.nanoTime();


protected String[] _hardCodedQueries =
new String[]{
"select sum(m1) from T",
Expand Down Expand Up @@ -124,7 +101,7 @@ protected void testHardCodedQueries(IndexSegment segment, Schema schema) {
* @param metricNames
* @param brokerRequest
*/
protected Map<String, double[]> computeSumUsingRawDocs(IndexSegment segment, List<String> metricNames,
private Map<String, double[]> computeSumUsingRawDocs(IndexSegment segment, List<String> metricNames,
BrokerRequest brokerRequest) {
FilterPlanNode planNode = new FilterPlanNode(segment, brokerRequest);
Operator rawOperator = planNode.run();
Expand All @@ -143,7 +120,7 @@ protected Map<String, double[]> computeSumUsingRawDocs(IndexSegment segment, Lis
* @param brokerRequest
* @return
*/
Map<String, double[]> computeSumUsingAggregatedDocs(IndexSegment segment, List<String> metricNames,
private Map<String, double[]> computeSumUsingAggregatedDocs(IndexSegment segment, List<String> metricNames,
BrokerRequest brokerRequest) {
StarTreeIndexOperator starTreeOperator = new StarTreeIndexOperator(segment, brokerRequest);
starTreeOperator.open();
Expand All @@ -157,95 +134,6 @@ Map<String, double[]> computeSumUsingAggregatedDocs(IndexSegment segment, List<S
return computeSum(segment, starTreeDocIdIterator, metricNames, groupByColumns);
}

/**
* Helper method to build the segment.
*
* @param segmentDirName
* @param segmentName
* @throws Exception
*/
Schema buildSegment(String segmentDirName, String segmentName, boolean enableOffHeapFormat)
throws Exception {
int ROWS = (int) MathUtils.factorial(NUM_DIMENSIONS);
Schema schema = new Schema();

for (int i = 0; i < NUM_DIMENSIONS; i++) {
String dimName = "d" + (i + 1);
DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec(dimName, FieldSpec.DataType.STRING, true);
schema.addField(dimensionFieldSpec);
}

schema.setTimeFieldSpec(new TimeFieldSpec(TIME_COLUMN_NAME, FieldSpec.DataType.INT, TimeUnit.DAYS));
for (int i = 0; i < NUM_METRICS; i++) {
String metricName = "m" + (i + 1);
MetricFieldSpec metricFieldSpec = new MetricFieldSpec(metricName, FieldSpec.DataType.INT);
schema.addField(metricFieldSpec);
}

SegmentGeneratorConfig config = new SegmentGeneratorConfig(schema);
config.setEnableStarTreeIndex(true);
config.setOutDir(segmentDirName);
config.setFormat(FileFormat.AVRO);
config.setSegmentName(segmentName);
config.setStarTreeIndexSpec(buildStarTreeIndexSpec(enableOffHeapFormat));

final List<GenericRow> data = new ArrayList<>();
for (int row = 0; row < ROWS; row++) {
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < NUM_DIMENSIONS; i++) {
String dimName = schema.getDimensionFieldSpecs().get(i).getName();
map.put(dimName, dimName + "-v" + row % (NUM_DIMENSIONS - i));
}

Random random = new Random(_randomSeed);
for (int i = 0; i < NUM_METRICS; i++) {
String metName = schema.getMetricFieldSpecs().get(i).getName();
map.put(metName, random.nextInt(METRIC_MAX_VALUE));
}

// Time column.
map.put("daysSinceEpoch", row % 7);

GenericRow genericRow = new GenericRow();
genericRow.init(map);
data.add(genericRow);
}

SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
RecordReader reader = createReader(schema, data);
driver.init(config, reader);
driver.build();

LOGGER.info("Built segment {} at {}", segmentName, segmentDirName);
return schema;
}

/**
* Builds a star tree index spec for the test.
* - Use MaxLeafRecords as 1 to stress test.
* @return
* @param enableOffHeapFormat
*/
private StarTreeIndexSpec buildStarTreeIndexSpec(boolean enableOffHeapFormat) {
StarTreeIndexSpec spec = new StarTreeIndexSpec();
spec.setMaxLeafRecords(1);
spec.setEnableOffHeapFormat(enableOffHeapFormat);
return spec;
}

/**
* Helper method to load the segment.
*
* @param segmentDirName
* @param segmentName
* @throws Exception
*/
IndexSegment loadSegment(String segmentDirName, String segmentName)
throws Exception {
LOGGER.info("Loading segment {}", segmentName);
return Loaders.IndexSegment.load(new File(segmentDirName, segmentName), ReadMode.heap);
}

/**
* Compute 'sum' for a given list of metrics, by scanning the given set of doc-ids.
*
Expand Down Expand Up @@ -305,49 +193,4 @@ private Map<String, double[]> computeSum(IndexSegment segment, BlockDocIdIterato

return result;
}

private RecordReader createReader(final Schema schema, final List<GenericRow> data) {
return new RecordReader() {

int counter = 0;

@Override
public void rewind()
throws Exception {
counter = 0;
}

@Override
public GenericRow next() {
return data.get(counter++);
}

@Override
public void init()
throws Exception {

}

@Override
public boolean hasNext() {
return counter < data.size();
}

@Override
public Schema getSchema() {
return schema;
}

@Override
public Map<String, MutableLong> getNullCountMap() {
return null;
}

@Override
public void close()
throws Exception {

}
};
}
}