Skip to content

Commit

Permalink
Fixing the issue of selection order-by won't give correct ordering (#…
Browse files Browse the repository at this point in the history
…4540)

Fixing the issue of selection order-by won't give correct ordering
Enhance the integration test to check the order of the result for selection order-by queries
  • Loading branch information
xiangfu0 authored and Jackie-Jiang committed Aug 22, 2019
1 parent 66d31d6 commit cf9974c
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 15 deletions.
Expand Up @@ -70,6 +70,7 @@
public class SelectionOperatorService {
private final List<String> _selectionColumns;
private final List<SelectionSort> _sortSequence;
private final int[] _sortColumnIdx;
private final DataSchema _dataSchema;
private final int _selectionOffset;
private final int _maxNumRows;
Expand All @@ -87,6 +88,8 @@ public SelectionOperatorService(@Nonnull Selection selection, @Nonnull IndexSegm
_selectionColumns = SelectionOperatorUtils.getSelectionColumns(selection.getSelectionColumns(), indexSegment);
_sortSequence = getSortSequence(selection.getSelectionSortSequence());
_dataSchema = SelectionOperatorUtils.extractDataSchema(_sortSequence, _selectionColumns, indexSegment);
_sortColumnIdx =
SelectionOperatorUtils.getColumnIndices(SelectionOperatorUtils.extractSortColumns(_sortSequence), _dataSchema);
// Select rows from offset to offset + size.
_selectionOffset = selection.getOffset();
_maxNumRows = _selectionOffset + selection.getSize();
Expand All @@ -103,6 +106,8 @@ public SelectionOperatorService(@Nonnull Selection selection, @Nonnull DataSchem
_selectionColumns = SelectionOperatorUtils.getSelectionColumns(selection.getSelectionColumns(), dataSchema);
_sortSequence = getSortSequence(selection.getSelectionSortSequence());
_dataSchema = dataSchema;
_sortColumnIdx =
SelectionOperatorUtils.getColumnIndices(SelectionOperatorUtils.extractSortColumns(_sortSequence), _dataSchema);
// Select rows from offset to offset + size.
_selectionOffset = selection.getOffset();
_maxNumRows = _selectionOffset + selection.getSize();
Expand Down Expand Up @@ -211,8 +216,9 @@ public int compare(Serializable[] o1, Serializable[] o2) {
for (int i = 0; i < numSortColumns; i++) {
int ret = 0;
SelectionSort selectionSort = _sortSequence.get(i);
Serializable v1 = o1[i];
Serializable v2 = o2[i];
int colIdx = _sortColumnIdx[i];
Serializable v1 = o1[colIdx];
Serializable v2 = o2[colIdx];

// Only compare single-value columns.
if (v1 instanceof Number) {
Expand Down
Expand Up @@ -437,8 +437,7 @@ public static SelectionResults renderSelectionResultsWithoutOrdering(@Nonnull Li
* @param dataSchema data schema.
* @return column indices
*/
public static int[] getColumnIndices(@Nonnull List<String> selectionColumns,
@Nonnull DataSchema dataSchema) {
public static int[] getColumnIndices(@Nonnull List<String> selectionColumns, @Nonnull DataSchema dataSchema) {
int numSelectionColumns = selectionColumns.size();
int[] columnIndices = new int[numSelectionColumns];
int numColumnsInDataSchema = dataSchema.size();
Expand All @@ -452,8 +451,6 @@ public static int[] getColumnIndices(@Nonnull List<String> selectionColumns,
return columnIndices;
}



/**
* Extract columns from the row based on the given column indices.
* <p>The extracted row is used to build the {@link SelectionResults}.
Expand Down Expand Up @@ -679,4 +676,14 @@ public static <T> void addToPriorityQueue(@Nonnull T value, @Nonnull PriorityQue
queue.offer(value);
}
}

public static List<String> extractSortColumns(List<SelectionSort> sortSequence) {
List<String> columns = new ArrayList<>();
if (sortSequence != null) {
for (SelectionSort s : sortSequence) {
columns.add(s.column);
}
}
return columns;
}
}
Expand Up @@ -49,21 +49,23 @@
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.Utf8;
import org.apache.pinot.broker.requesthandler.PinotQueryParserFactory;
import org.apache.pinot.broker.requesthandler.PinotQueryRequest;
import org.apache.pinot.client.Request;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
import org.apache.pinot.core.realtime.stream.StreamDataProducer;
import org.apache.pinot.core.realtime.stream.StreamDataProvider;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.core.util.AvroUtils;
import org.apache.pinot.server.util.SegmentTestUtils;
import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
import org.testng.Assert;


Expand Down Expand Up @@ -253,7 +255,7 @@ public static void buildSegmentsFromAvro(List<File> avroFiles, int baseSegmentIn
SegmentTestUtils.getSegmentGeneratorConfig(avroFile, outputDir, TimeUnit.DAYS, tableName, pinotSchema);

// Test segment with space and special character in the file name
segmentGeneratorConfig.setSegmentNamePostfix(String.valueOf(segmentIndex) + " %");
segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + " %");

// Cannot build star-tree V1 and V2 at same time
if (starTreeV2BuilderConfigs != null) {
Expand Down Expand Up @@ -324,7 +326,8 @@ public static void pushAvroIntoKafka(@Nonnull List<File> avroFiles, @Nonnull Str
properties.put("request.required.acks", "1");
properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");

StreamDataProducer producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
StreamDataProducer producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);

try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
for (File avroFile : avroFiles) {
Expand All @@ -342,7 +345,7 @@ public static void pushAvroIntoKafka(@Nonnull List<File> avroFiles, @Nonnull Str
byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(System.currentTimeMillis())
: (genericRecord.get(partitionColumn)).toString().getBytes();
byte[] bytes = outputStream.toByteArray();
producer.produce(kafkaTopic,keyBytes,bytes);
producer.produce(kafkaTopic, keyBytes, bytes);
}
}
}
Expand Down Expand Up @@ -372,7 +375,8 @@ public static void pushRandomAvroIntoKafka(@Nonnull File avroFile, @Nonnull Stri
properties.put("request.required.acks", "1");
properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");

StreamDataProducer producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
StreamDataProducer producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) {
BinaryEncoder binaryEncoder = new EncoderFactory().directBinaryEncoder(outputStream, null);
Expand All @@ -394,7 +398,7 @@ public static void pushRandomAvroIntoKafka(@Nonnull File avroFile, @Nonnull Stri
: (genericRecord.get(partitionColumn)).toString().getBytes();
byte[] bytes = outputStream.toByteArray();

producer.produce(kafkaTopic,keyBytes,bytes);
producer.produce(kafkaTopic, keyBytes, bytes);
numKafkaMessagesToPush--;
}
}
Expand Down Expand Up @@ -651,7 +655,11 @@ public static void testQuery(@Nonnull String pinotQuery, @Nonnull String queryFo
ResultSet h2ResultSet = h2statement.getResultSet();
ResultSetMetaData h2MetaData = h2ResultSet.getMetaData();

List<String> orderByColumns = SelectionOperatorUtils.extractSortColumns(
PinotQueryParserFactory.get("pql").compileToBrokerRequest(pinotQuery).getSelections()
.getSelectionSortSequence());
Set<String> expectedValues = new HashSet<>();
List<String> expectedOrderByValues = new ArrayList<>();
Map<String, String> reusableExpectedValueMap = new HashMap<>();
Map<String, List<String>> reusableMultiValuesMap = new HashMap<>();
List<String> reusableColumnOrder = new ArrayList<>();
Expand Down Expand Up @@ -702,11 +710,15 @@ public static void testQuery(@Nonnull String pinotQuery, @Nonnull String queryFo

// Build expected value String
StringBuilder expectedValue = new StringBuilder();
StringBuilder expectedOrderByValue = new StringBuilder();
for (String column : reusableColumnOrder) {
expectedValue.append(column).append(':').append(reusableExpectedValueMap.get(column)).append(' ');
if (orderByColumns.contains(column)) {
expectedOrderByValue.append(column).append(':').append(reusableExpectedValueMap.get(column)).append(' ');
}
}

expectedValues.add(expectedValue.toString());
expectedOrderByValues.add(expectedOrderByValue.toString());
}

org.apache.pinot.client.ResultSet pinotSelectionResultSet = pinotResultSetGroup.getResultSet(0);
Expand Down Expand Up @@ -739,6 +751,7 @@ public static void testQuery(@Nonnull String pinotQuery, @Nonnull String queryFo
for (int rowIndex = 0; rowIndex < pinotNumRows; rowIndex++) {
// Build actual value String.
StringBuilder actualValueBuilder = new StringBuilder();
StringBuilder actualOrderByValueBuilder = new StringBuilder();
for (int columnIndex = 0; columnIndex < numColumns; columnIndex++) {
// Convert column name to all uppercase to make it compatible with H2
String columnName = pinotSelectionResultSet.getColumnName(columnIndex).toUpperCase();
Expand All @@ -759,19 +772,35 @@ public static void testQuery(@Nonnull String pinotQuery, @Nonnull String queryFo
}
Collections.sort(multiValue);
actualValueBuilder.append(columnName).append(':').append(multiValue.toString()).append(' ');
if (orderByColumns.contains(columnName)) {
actualOrderByValueBuilder.append(columnName).append(':').append(columnResult).append(' ');
}
} else {
// Single-value column
actualValueBuilder.append(columnName).append(':').append(columnResult).append(' ');
if (orderByColumns.contains(columnName)) {
actualOrderByValueBuilder.append(columnName).append(':').append(columnResult).append(' ');
}
}
}
String actualValue = actualValueBuilder.toString();

String actualOrderByValue = actualOrderByValueBuilder.toString();
// Check actual value in expected values set
if (!expectedValues.contains(actualValue)) {
String failureMessage = "Selection result returned in Pinot but not in H2: " + actualValue;
failure(pinotQuery, sqlQueries, failureMessage);
return;
}
if (!orderByColumns.isEmpty()) {
// Check actual group value is the same as expected group value in the same order.
if (!expectedOrderByValues.get(rowIndex).equals(actualOrderByValue)) {
String failureMessage = String.format(
"Selection Order by result at row index: %d in Pinot: [ %s ] is different than result in H2: [ %s ].",
rowIndex, actualOrderByValue, expectedOrderByValues.get(rowIndex));
failure(pinotQuery, sqlQueries, failureMessage);
return;
}
}
}
}
} else {
Expand Down
Expand Up @@ -681,6 +681,22 @@ public void testQueryWithRepeatedColumns()
"SELECT ArrTime, ArrTime, count(*) FROM mytable WHERE DaysSinceEpoch <= 16312 AND Carrier = 'DL' group by ArrTime, ArrTime"));
}

@Test
public void testQueryWithOrderby()
throws Exception {
//test repeated columns in selection query
String query = "SELECT ArrTime, Carrier, DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC";
testQuery(query, Collections.singletonList(query));

//test repeated columns in selection query
query = "SELECT ArrTime, DaysSinceEpoch, Carrier FROM mytable ORDER BY Carrier DESC";
testQuery(query, Collections.singletonList(query));

//test repeated columns in selection query
query = "SELECT ArrTime, DaysSinceEpoch, Carrier FROM mytable ORDER BY Carrier DESC, ArrTime DESC";
testQuery(query, Collections.singletonList(query));
}

@AfterClass
public void tearDown()
throws Exception {
Expand Down
Expand Up @@ -7648,7 +7648,7 @@
{"hsqls":["SELECT COUNT(*) FROM mytable WHERE FlightNum >= 1672 AND (DivLongestGTimes__MV0 BETWEEN 136.0 AND 66.0 OR DivLongestGTimes__MV1 BETWEEN 136.0 AND 66.0 OR DivLongestGTimes__MV2 BETWEEN 136.0 AND 66.0 OR DivLongestGTimes__MV3 BETWEEN 136.0 AND 66.0 OR DivLongestGTimes__MV4 BETWEEN 136.0 AND 66.0) LIMIT 10000"],"pql":"SELECT COUNT(*) FROM mytable WHERE FlightNum >= 1672 AND DivLongestGTimes BETWEEN 136.0 AND 66.0 TOP 4"}
{"hsqls":["SELECT DepartureDelayGroups, FirstDepTime, Distance, MAX(WheelsOff) FROM mytable WHERE ArrDelayMinutes <> 218.0 OR Diverted > 0 GROUP BY DepartureDelayGroups, FirstDepTime, Distance LIMIT 10000","SELECT DepartureDelayGroups, FirstDepTime, Distance, MAX(SecurityDelay) FROM mytable WHERE ArrDelayMinutes <> 218.0 OR Diverted > 0 GROUP BY DepartureDelayGroups, FirstDepTime, Distance LIMIT 10000"],"pql":"SELECT MAX(WheelsOff), MAX(SecurityDelay) FROM mytable WHERE ArrDelayMinutes <> 218.0 OR Diverted > 0 GROUP BY DepartureDelayGroups, FirstDepTime, Distance TOP 15"}
{"hsqls":["SELECT DepTimeBlk, DivActualElapsedTime FROM mytable WHERE DestState IN ('AR') AND DistanceGroup BETWEEN 8 AND 7 ORDER BY Year, Quarter LIMIT 10000"],"pql":"SELECT DepTimeBlk, DivActualElapsedTime FROM mytable WHERE DestState IN ('AR') AND DistanceGroup BETWEEN 8 AND 7 ORDER BY Year, Quarter LIMIT 21"}
{"hsqls":["SELECT ArrTimeBlk, AirTime FROM mytable WHERE LongestAddGTime IN (30, 42, 32, 74, 14) OR Month <= 1 ORDER BY ArrTimeBlk LIMIT 10000"],"pql":"SELECT ArrTimeBlk, AirTime FROM mytable WHERE LongestAddGTime IN (30, 42, 32, 74, 14) OR Month <= 1 ORDER BY ArrTimeBlk LIMIT 2"}
{"hsqls":["SELECT ArrTimeBlk, AirTime FROM mytable WHERE LongestAddGTime IN (30, 42, 32, 74, 14) OR Month <= 1 ORDER BY ArrTimeBlk, AirTime LIMIT 10000"],"pql":"SELECT ArrTimeBlk, AirTime FROM mytable WHERE LongestAddGTime IN (30, 42, 32, 74, 14) OR Month <= 1 ORDER BY ArrTimeBlk, AirTime LIMIT 2"}
{"hsqls":["SELECT FirstDepTime, WeatherDelay, DepTimeBlk, MAX(WheelsOff) FROM mytable WHERE WheelsOff BETWEEN 1128 AND 1013 GROUP BY FirstDepTime, WeatherDelay, DepTimeBlk LIMIT 10000","SELECT FirstDepTime, WeatherDelay, DepTimeBlk, SUM(WheelsOff) FROM mytable WHERE WheelsOff BETWEEN 1128 AND 1013 GROUP BY FirstDepTime, WeatherDelay, DepTimeBlk LIMIT 10000","SELECT FirstDepTime, WeatherDelay, DepTimeBlk, MIN(ArrivalDelayGroups) FROM mytable WHERE WheelsOff BETWEEN 1128 AND 1013 GROUP BY FirstDepTime, WeatherDelay, DepTimeBlk LIMIT 10000"],"pql":"SELECT MAX(WheelsOff), SUM(WheelsOff), MIN(ArrivalDelayGroups) FROM mytable WHERE WheelsOff BETWEEN 1128 AND 1013 GROUP BY FirstDepTime, WeatherDelay, DepTimeBlk TOP 4"}
{"hsqls":["SELECT Quarter, DivAirportSeqIDs__MV0, DivAirportSeqIDs__MV1, DivAirportSeqIDs__MV2, DivAirportSeqIDs__MV3, DivAirportSeqIDs__MV4, DestCityMarketID FROM mytable WHERE CancellationCode BETWEEN 'A' AND 'C' OR OriginAirportID BETWEEN 15096 AND 14679 ORDER BY DepDelayMinutes LIMIT 10000"],"pql":"SELECT Quarter, DivAirportSeqIDs, DestCityMarketID FROM mytable WHERE CancellationCode BETWEEN 'A' AND 'C' OR OriginAirportID BETWEEN 15096 AND 14679 ORDER BY DepDelayMinutes LIMIT 13"}
{"hsqls":["SELECT OriginState, Distance, NASDelay, COUNT(*) FROM mytable WHERE NASDelay IN (16, 75) GROUP BY OriginState, Distance, NASDelay LIMIT 10000"],"pql":"SELECT COUNT(*) FROM mytable WHERE NASDelay IN (16, 75) GROUP BY OriginState, Distance, NASDelay TOP 8"}
Expand Down

0 comments on commit cf9974c

Please sign in to comment.