Skip to content

Commit

Permalink
Support order-by on BYTES column
Browse files Browse the repository at this point in the history
In order to support order-by on BYTES column everywhere, inside the system we should always use ByteArray (comparable) to store the BYTES value.
Currently BYTES value are stored as byte[], ByteArray or String in different places, which is very confusing and could cause casting errors.

Changes:
- For DisctinctCount, fix the casting issue when ordering on BYTES column
- For selection order-by, order BYTES column using ByteArray instead of String for performance improvement
- Inside Record, always store BYTES as ByteArray for clarity and also performance improvement (avoid expensive deepEquals and deepHashCode)
- On broker side, store BYTES column using ByteArray instead of String for performance improvement
- On broker side, support type compatible merges for all selection queries

No format change on the query results.
TODO: We are still returning String for BYTES column when preserving the type. Consider changing it to byte[].
  • Loading branch information
Jackie-Jiang committed Apr 8, 2020
1 parent aaacf8d commit b5e867a
Show file tree
Hide file tree
Showing 26 changed files with 513 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Map;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.spi.utils.ByteArray;


/**
Expand Down Expand Up @@ -63,6 +64,8 @@ byte[] toBytes()

String getString(int rowId, int colId);

ByteArray getBytes(int rowId, int colId);

<T> T getObject(int rowId, int colId);

int[] getIntArray(int rowId, int colId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.pinot.core.common;

import java.io.Serializable;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ByteArray;


public class RowBasedBlockValueFetcher {
Expand All @@ -33,9 +33,9 @@ public RowBasedBlockValueFetcher(BlockValSet[] blockValSets) {
}
}

public Serializable[] getRow(int docId) {
public Object[] getRow(int docId) {
int numColumns = _valueFetchers.length;
Serializable[] row = new Serializable[numColumns];
Object[] row = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
row[i] = _valueFetchers[i].getValue(docId);
}
Expand Down Expand Up @@ -80,10 +80,10 @@ private ValueFetcher createFetcher(BlockValSet blockValSet) {
}

private interface ValueFetcher {
Serializable getValue(int docId);
Object getValue(int docId);
}

private class IntSingleValueFetcher implements ValueFetcher {
private static class IntSingleValueFetcher implements ValueFetcher {
private final int[] _values;

IntSingleValueFetcher(int[] values) {
Expand All @@ -95,7 +95,7 @@ public Integer getValue(int docId) {
}
}

private class LongSingleValueFetcher implements ValueFetcher {
private static class LongSingleValueFetcher implements ValueFetcher {
private final long[] _values;

LongSingleValueFetcher(long[] values) {
Expand All @@ -107,7 +107,7 @@ public Long getValue(int docId) {
}
}

private class FloatSingleValueFetcher implements ValueFetcher {
private static class FloatSingleValueFetcher implements ValueFetcher {
private final float[] _values;

FloatSingleValueFetcher(float[] values) {
Expand All @@ -119,7 +119,7 @@ public Float getValue(int docId) {
}
}

private class DoubleSingleValueFetcher implements ValueFetcher {
private static class DoubleSingleValueFetcher implements ValueFetcher {
private final double[] _values;

DoubleSingleValueFetcher(double[] values) {
Expand All @@ -131,7 +131,7 @@ public Double getValue(int docId) {
}
}

private class StringSingleValueFetcher implements ValueFetcher {
private static class StringSingleValueFetcher implements ValueFetcher {
private final String[] _values;

StringSingleValueFetcher(String[] values) {
Expand All @@ -143,19 +143,19 @@ public String getValue(int docId) {
}
}

private class BytesValueFetcher implements ValueFetcher {
private static class BytesValueFetcher implements ValueFetcher {
private final byte[][] _values;

BytesValueFetcher(byte[][] values) {
_values = values;
}

public byte[] getValue(int docId) {
return _values[docId];
public ByteArray getValue(int docId) {
return new ByteArray(_values[docId]);
}
}

private class IntMultiValueFetcher implements ValueFetcher {
private static class IntMultiValueFetcher implements ValueFetcher {
private final int[][] _values;

IntMultiValueFetcher(int[][] values) {
Expand All @@ -167,7 +167,7 @@ public int[] getValue(int docId) {
}
}

private class LongMultiValueFetcher implements ValueFetcher {
private static class LongMultiValueFetcher implements ValueFetcher {
private final long[][] _values;

LongMultiValueFetcher(long[][] values) {
Expand All @@ -179,7 +179,7 @@ public long[] getValue(int docId) {
}
}

private class FloatMultiValueFetcher implements ValueFetcher {
private static class FloatMultiValueFetcher implements ValueFetcher {
private final float[][] _values;

FloatMultiValueFetcher(float[][] values) {
Expand All @@ -191,7 +191,7 @@ public float[] getValue(int docId) {
}
}

private class DoubleMultiValueFetcher implements ValueFetcher {
private static class DoubleMultiValueFetcher implements ValueFetcher {
private final double[][] _values;

DoubleMultiValueFetcher(double[][] values) {
Expand All @@ -203,7 +203,7 @@ public double[] getValue(int docId) {
}
}

private class StringMultiValueFetcher implements ValueFetcher {
private static class StringMultiValueFetcher implements ValueFetcher {
private final String[][] _values;

StringMultiValueFetcher(String[][] values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.utils.ByteArray;


/**
Expand Down Expand Up @@ -74,6 +75,7 @@
// TODO: 1. Fix float size.
// TODO: 2. Use one dictionary for all columns (save space).
// TODO: 3. Given a data schema, write all values one by one instead of using rowId and colId to position (save time).
// TODO: 4. Store bytes as variable size data instead of String
public class DataTableBuilder {
private final DataSchema _dataSchema;
private final int[] _columnOffsets;
Expand Down Expand Up @@ -162,44 +164,32 @@ public void setColumn(int colId, String value) {
_currentRowDataByteBuffer.putInt(dictId);
}

public void setColumn(int colId, Object value)
public void setColumn(int colId, ByteArray value)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
_currentRowDataByteBuffer.putInt(bytes.length);
_variableSizeDataOutputStream.writeInt(objectTypeValue);
_variableSizeDataByteArrayOutputStream.write(bytes);
}
// NOTE: Use String to store bytes value in DataTable V2 for backward-compatibility
setColumn(colId, value.toHexString());

public void setColumn(int colId, byte[] values) {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
_currentRowDataByteBuffer.putInt(values.length);
for (byte value : values) {
_variableSizeDataByteArrayOutputStream.write(value);
}
}
/*
TODO: Store bytes as variable size data instead of String. Make the change for the next version data table for
backward-compatibility
public void setColumn(int colId, char[] values)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
_currentRowDataByteBuffer.putInt(values.length);
for (char value : values) {
_variableSizeDataOutputStream.writeChar(value);
}
byte[] bytes = value.getBytes();
_currentRowDataByteBuffer.putInt(bytes.length);
_variableSizeDataByteArrayOutputStream.write(bytes);
*/
}

public void setColumn(int colId, short[] values)
public void setColumn(int colId, Object value)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
_currentRowDataByteBuffer.putInt(values.length);
for (short value : values) {
_variableSizeDataOutputStream.writeShort(value);
}
int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
_currentRowDataByteBuffer.putInt(bytes.length);
_variableSizeDataOutputStream.writeInt(objectTypeValue);
_variableSizeDataByteArrayOutputStream.write(bytes);
}

public void setColumn(int colId, int[] values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;


public class DataTableImplV2 implements DataTable {
Expand Down Expand Up @@ -388,6 +390,12 @@ public String getString(int rowId, int colId) {
return _dictionaryMap.get(_dataSchema.getColumnName(colId)).get(dictId);
}

@Override
public ByteArray getBytes(int rowId, int colId) {
// NOTE: DataTable V2 uses String to store BYTES value
return BytesUtils.toByteArray(getString(rowId, colId));
}

@Override
public <T> T getObject(int rowId, int colId) {
int size = positionCursorInVariableBuffer(rowId, colId);
Expand Down
37 changes: 20 additions & 17 deletions pinot-core/src/main/java/org/apache/pinot/core/data/table/Key.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,36 @@


/**
* Defines the key component of the record
* Defines the key component of the record.
* <p>Key can be used as the key in a map, and may only contain single-value columns.
* <p>For each data type, the value should be stored as:
* <ul>
* <li>INT: Integer</li>
* <li>LONG: Long</li>
* <li>FLOAT: Float</li>
* <li>DOUBLE: Double</li>
* <li>STRING: String</li>
* <li>BYTES: ByteArray</li>
* </ul>
*
* TODO: Consider replacing Key with Record as the concept is very close and the implementation is the same
*/
public class Key {
private Object[] _columns;

public Key(Object[] columns) {
_columns = columns;
}
private final Object[] _values;

public Object[] getColumns() {
return _columns;
public Key(Object[] values) {
_values = values;
}

// NOTE: Not check class for performance concern
@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override
public boolean equals(Object o) {
Key that = (Key) o;
return Arrays.deepEquals(_columns, that._columns);
return Arrays.equals(_values, ((Key) o)._values);
}

@Override
public int hashCode() {
return Arrays.deepHashCode(_columns);
return Arrays.hashCode(_values);
}

@Override
public String toString() {
return Arrays.deepToString(_columns);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,46 @@


/**
* Defines a single record in Pinot
* Defines a single record in Pinot.
* <p>Record may contain both single-value and multi-value columns. In order to use the record as the key in a map, it
* can only contain single-value columns (to avoid using Arrays.deepEquals() and Arrays.deepHashCode() for performance
* concern).
* <p>For each data type, the value should be stored as:
* <ul>
* <li>INT: Integer</li>
* <li>LONG: Long</li>
* <li>FLOAT: Float</li>
* <li>DOUBLE: Double</li>
* <li>STRING: String</li>
* <li>BYTES: ByteArray</li>
* <li>OBJECT (intermediate aggregation result): Object</li>
* <li>INT_ARRAY: int[]</li>
* <li>LONG_ARRAY: long[]</li>
* <li>FLOAT_ARRAY: float[]</li>
* <li>DOUBLE_ARRAY: double[]</li>
* <li>STRING_ARRAY: String[]</li>
* </ul>
*/
public class Record {
private Object[] _values;
private final Object[] _values;

public Record(Object[] values) {
_values = values;
}

/**
* Returns the column values contained in the Record
*/
public Object[] getValues() {
return _values;
}

@Override
public String toString() {
return Arrays.deepToString(_values);
}

// NOTE: Not check class for performance concern
@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override
public boolean equals(Object o) {
Record that = (Record) o;
return Arrays.deepEquals(_values, that._values);
return Arrays.equals(_values, ((Record) o)._values);
}

@Override
public int hashCode() {
return Arrays.deepHashCode(_values);
return Arrays.hashCode(_values);
}
}

0 comments on commit b5e867a

Please sign in to comment.