Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pinot.core.operator.blocks;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -112,7 +111,7 @@ public DataSchema getDataSchema() {
}

@Nullable
public Collection<Object[]> getRows() {
public List<Object[]> getRows() {
return _resultsBlock != null ? _resultsBlock.getRows(_queryContext) : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.DataBlock;
Expand Down Expand Up @@ -65,6 +66,37 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
private static final Logger LOGGER = LoggerFactory.getLogger(LeafStageTransferableBlockOperator.class);

/**
* List of compatible data type between V1 and V2 without the need to canonicalize.
*
* @see org.apache.pinot.common.utils.DataSchema.ColumnDataType#convert(Object) for more details.
*/
private static final Set<DataSchema.ColumnDataType> COMPATIBLE_DATA_TYPES = ImmutableSet.of(
DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.LONG,
DataSchema.ColumnDataType.FLOAT,
DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.BIG_DECIMAL,
// TODO: boolean require conversion b/c v1 returns 1 & 0
// DataSchema.ColumnDataType.BOOLEAN,
// TODO: timestamp require conversion b/c v1 returns long
// DataSchema.ColumnDataType.TIMESTAMP,
DataSchema.ColumnDataType.STRING
// TODO: the following data type support without conversion is un-tested
// DataSchema.ColumnDataType.JSON,
// DataSchema.ColumnDataType.BYTES,
// DataSchema.ColumnDataType.OBJECT,
// DataSchema.ColumnDataType.INT_ARRAY,
// DataSchema.ColumnDataType.LONG_ARRAY,
// DataSchema.ColumnDataType.FLOAT_ARRAY,
// DataSchema.ColumnDataType.DOUBLE_ARRAY,
// DataSchema.ColumnDataType.BOOLEAN_ARRAY,
// DataSchema.ColumnDataType.TIMESTAMP_ARRAY,
// DataSchema.ColumnDataType.STRING_ARRAY,
// DataSchema.ColumnDataType.BYTES_ARRAY,
// DataSchema.ColumnDataType.UNKNOWN,
);

private final LinkedList<ServerQueryRequest> _serverQueryRequestQueue;
private final DataSchema _desiredDataSchema;
private final Function<ServerQueryRequest, InstanceResponseBlock> _processCall;
Expand Down Expand Up @@ -177,9 +209,8 @@ private static TransferableBlock composeTransferableBlock(InstanceResponseBlock
private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
DataSchema desiredDataSchema) {
DataSchema resultSchema = responseBlock.getDataSchema();
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
resultSchema.getColumnDataTypes()), "Incompatible selection result data schema: "
+ " Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema, resultSchema),
"Incompatible selection result data schema: Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
}

Expand All @@ -194,9 +225,8 @@ private static TransferableBlock composeDistinctTransferableBlock(InstanceRespon
private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
DataSchema desiredDataSchema) {
DataSchema resultSchema = responseBlock.getDataSchema();
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
resultSchema.getColumnDataTypes()), "Incompatible selection result data schema: "
+ " Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema, resultSchema),
"Incompatible selection result data schema: Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
}

Expand All @@ -212,9 +242,8 @@ private static TransferableBlock composeGroupByTransferableBlock(InstanceRespons
private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
DataSchema desiredDataSchema) {
DataSchema resultSchema = responseBlock.getDataSchema();
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
resultSchema.getColumnDataTypes()), "Incompatible selection result data schema: "
+ " Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema, resultSchema),
"Incompatible selection result data schema: Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
}

Expand All @@ -230,13 +259,13 @@ private static TransferableBlock composeSelectTransferableBlock(InstanceResponse
DataSchema resultSchema = responseBlock.getDataSchema();
List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
resultSchema);
// TODO: we should make server return column indices order the same as Calcite
int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
if (!inOrder(columnIndices)) {
DataSchema adjustedResultSchema = SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
adjustedResultSchema.getColumnDataTypes()), "Incompatible selection result data schema: "
+ " Expected: " + desiredDataSchema + ". Actual: " + adjustedResultSchema
+ " Column Ordering: " + Arrays.toString(columnIndices));
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema, adjustedResultSchema),
"Incompatible selection result data schema: Expected: " + desiredDataSchema + ". Actual: "
+ adjustedResultSchema + " Column Ordering: " + Arrays.toString(columnIndices));
return composeColumnIndexedTransferableBlock(responseBlock, adjustedResultSchema, columnIndices);
} else {
return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
Expand All @@ -252,15 +281,8 @@ private static TransferableBlock composeColumnIndexedTransferableBlock(InstanceR
DataSchema desiredDataSchema, int[] columnIndices) {
Collection<Object[]> resultRows = responseBlock.getRows();
List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
if (resultRows instanceof List) {
for (Object[] row : resultRows) {
extractedRows.add(canonicalizeRow(row, desiredDataSchema, columnIndices));
}
} else if (resultRows instanceof PriorityQueue) {
PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) resultRows;
while (!priorityQueue.isEmpty()) {
extractedRows.add(canonicalizeRow(priorityQueue.poll(), desiredDataSchema, columnIndices));
}
for (Object[] row : resultRows) {
extractedRows.add(canonicalizeRow(row, desiredDataSchema, columnIndices));
}
return new TransferableBlock(extractedRows, desiredDataSchema, DataBlock.Type.ROW);
}
Expand All @@ -273,21 +295,16 @@ private static TransferableBlock composeColumnIndexedTransferableBlock(InstanceR
*/
private static TransferableBlock composeDirectTransferableBlock(InstanceResponseBlock responseBlock,
DataSchema desiredDataSchema) {
Collection<Object[]> resultRows = responseBlock.getRows();
List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
if (resultRows instanceof List) {
if (isDataSchemaColumnTypesIdentical(desiredDataSchema, responseBlock.getDataSchema())) {
return new TransferableBlock(responseBlock.getRows(), desiredDataSchema, DataBlock.Type.ROW);
} else {
Collection<Object[]> resultRows = responseBlock.getRows();
List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
for (Object[] orgRow : resultRows) {
extractedRows.add(canonicalizeRow(orgRow, desiredDataSchema));
}
} else if (resultRows instanceof PriorityQueue) {
PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) resultRows;
while (!priorityQueue.isEmpty()) {
extractedRows.add(canonicalizeRow(priorityQueue.poll(), desiredDataSchema));
}
} else {
throw new UnsupportedOperationException("Unsupported collection type: " + resultRows.getClass());
return new TransferableBlock(extractedRows, desiredDataSchema, DataBlock.Type.ROW);
}
return new TransferableBlock(extractedRows, desiredDataSchema, DataBlock.Type.ROW);
}

private static boolean inOrder(int[] columnIndices) {
Expand Down Expand Up @@ -334,8 +351,13 @@ private static Object[] canonicalizeRow(Object[] row, DataSchema dataSchema, int
return resultRow;
}

private static boolean isDataSchemaColumnTypesCompatible(DataSchema.ColumnDataType[] desiredTypes,
DataSchema.ColumnDataType[] givenTypes) {
private static boolean isDataSchemaColumnTypesCompatible(DataSchema desiredDataSchema,
@Nullable DataSchema givenDataSchema) {
if (givenDataSchema == null) {
return false;
}
DataSchema.ColumnDataType[] desiredTypes = desiredDataSchema.getColumnDataTypes();
DataSchema.ColumnDataType[] givenTypes = givenDataSchema.getColumnDataTypes();
if (desiredTypes.length != givenTypes.length) {
return false;
}
Expand All @@ -346,4 +368,22 @@ private static boolean isDataSchemaColumnTypesCompatible(DataSchema.ColumnDataTy
}
return true;
}

private static boolean isDataSchemaColumnTypesIdentical(DataSchema desiredDataSchema,
@Nullable DataSchema givenDataSchema) {
if (givenDataSchema == null) {
return false;
}
DataSchema.ColumnDataType[] desiredTypes = desiredDataSchema.getColumnDataTypes();
DataSchema.ColumnDataType[] givenTypes = givenDataSchema.getColumnDataTypes();
if (desiredTypes.length != givenTypes.length) {
return false;
}
for (int i = 0; i < desiredTypes.length; i++) {
if (desiredTypes[i] != givenTypes[i] || (!COMPATIBLE_DATA_TYPES.contains(desiredTypes[i]))) {
return false;
}
}
return true;
}
}