Skip to content

Commit

Permalink
[CARBONDATA-3237] Fix presto carbon issues in dictionary include scen…
Browse files Browse the repository at this point in the history
…ario

problem1: Decimal column with dictionary include cannot be read in
presto
cause: int is typecasted to decimal for dictionary columns in decimal stream reader.
solution: keep original data type as well as new data type for decimal
stream reader.

problem2: Optimize presto query time for dictionary include string column
currently, for each query, presto carbon creates dictionary block for string columns.
cause: This happens for each query and if cardinality is more , it takes more time to build.
solution: dictionary block is not required. we can lookup using normal dictionary lookup.

This closes #3055
  • Loading branch information
ajantha-bhat authored and kumarvishal09 committed Jan 9, 2019
1 parent 1b45c41 commit 8e6def9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 50 deletions.
Expand Up @@ -37,8 +37,6 @@
import org.apache.carbondata.presto.readers.SliceStreamReader;
import org.apache.carbondata.presto.readers.TimestampStreamReader;

import com.facebook.presto.spi.block.Block;

public class CarbonVectorBatch {

private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
Expand All @@ -63,8 +61,7 @@ private CarbonVectorBatch(StructField[] schema, CarbonDictionaryDecodeReadSuppor
DataType[] dataTypes = readSupport.getDataTypes();

for (int i = 0; i < schema.length; ++i) {
columns[i] = createDirectStreamReader(maxRows, dataTypes[i], schema[i], dictionaries[i],
readSupport.getDictionaryBlock(i));
columns[i] = createDirectStreamReader(maxRows, dataTypes[i], schema[i], dictionaries[i]);
}
}

Expand All @@ -79,7 +76,7 @@ public static CarbonVectorBatch allocate(StructField[] schema,
}

private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,
StructField field, Dictionary dictionary, Block dictionaryBlock) {
StructField field, Dictionary dictionary) {
if (dataType == DataTypes.BOOLEAN) {
return new BooleanStreamReader(batchSize, field.getDataType(), dictionary);
} else if (dataType == DataTypes.SHORT) {
Expand All @@ -93,9 +90,10 @@ private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType
} else if (dataType == DataTypes.DOUBLE) {
return new DoubleStreamReader(batchSize, field.getDataType(), dictionary);
} else if (dataType == DataTypes.STRING) {
return new SliceStreamReader(batchSize, field.getDataType(), dictionaryBlock);
return new SliceStreamReader(batchSize, field.getDataType(), dictionary);
} else if (DataTypes.isDecimal(dataType)) {
return new DecimalSliceStreamReader(batchSize, (DecimalType) field.getDataType(), dictionary);
return new DecimalSliceStreamReader(batchSize, field.getDataType(), (DecimalType) dataType,
dictionary);
} else {
return new ObjectStreamReader(batchSize, field.getDataType());
}
Expand Down
Expand Up @@ -24,6 +24,7 @@
import static java.math.RoundingMode.HALF_UP;

import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
import org.apache.carbondata.core.util.DataTypeUtil;
Expand Down Expand Up @@ -57,10 +58,12 @@ public class DecimalSliceStreamReader extends CarbonColumnVectorImpl
protected BlockBuilder builder;
private Dictionary dictionary;

public DecimalSliceStreamReader(int batchSize,
org.apache.carbondata.core.metadata.datatype.DecimalType dataType, Dictionary dictionary) {
public DecimalSliceStreamReader(int batchSize, DataType dataType,
org.apache.carbondata.core.metadata.datatype.DecimalType decimalDataType,
Dictionary dictionary) {
super(batchSize, dataType);
this.type = DecimalType.createDecimalType(dataType.getPrecision(), dataType.getScale());
this.type =
DecimalType.createDecimalType(decimalDataType.getPrecision(), decimalDataType.getScale());
this.batchSize = batchSize;
this.builder = type.createBlockBuilder(null, batchSize);
this.dictionary = dictionary;
Expand Down
Expand Up @@ -17,11 +17,17 @@

package org.apache.carbondata.presto.readers;

import java.nio.charset.Charset;
import java.util.Objects;
import java.util.Optional;

import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
import org.apache.carbondata.core.util.DataTypeUtil;

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
Expand All @@ -44,27 +50,31 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV

protected BlockBuilder builder;

int[] values;

private Block dictionaryBlock;

private boolean isLocalDict;

private Dictionary globalDictionary;

public SliceStreamReader(int batchSize, DataType dataType,
Block dictionaryBlock) {
Dictionary dictionary) {
super(batchSize, dataType);
this.globalDictionary = dictionary;
this.batchSize = batchSize;
if (dictionaryBlock == null) {
this.builder = type.createBlockBuilder(null, batchSize);
} else {
this.dictionaryBlock = dictionaryBlock;
this.values = new int[batchSize];
}
this.builder = type.createBlockBuilder(null, batchSize);
}

@Override public Block buildBlock() {
if (dictionaryBlock == null) {
return builder.build();
} else {
return new DictionaryBlock(batchSize, dictionaryBlock, values);
int[] dataArray;
if (isLocalDict) {
dataArray = (int[]) ((CarbonColumnVectorImpl) getDictionaryVector()).getDataArray();
} else {
dataArray = (int[]) getDataArray();
}
return new DictionaryBlock(batchSize, dictionaryBlock, dataArray);
}
}

Expand Down Expand Up @@ -95,22 +105,13 @@ public SliceStreamReader(int batchSize, DataType dataType,
dictOffsets[dictOffsets.length - 1] = size;
dictionaryBlock = new VariableWidthBlock(dictionary.getDictionarySize(),
Slices.wrappedBuffer(singleArrayDictValues), dictOffsets, Optional.of(nulls));
values = (int[]) ((CarbonColumnVectorImpl) getDictionaryVector()).getDataArray();
this.isLocalDict = true;
}

@Override public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

@Override public void putInt(int rowId, int value) {
values[rowId] = value;
}

@Override public void putInts(int rowId, int count, int value) {
for (int i = 0; i < count; i++) {
values[rowId++] = value;
}
}

@Override public void putByteArray(int rowId, byte[] value) {
type.writeSlice(builder, wrappedBuffer(value));
Expand Down Expand Up @@ -142,5 +143,17 @@ public SliceStreamReader(int batchSize, DataType dataType,

@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
this.isLocalDict = false;
}

@Override public void putInt(int rowId, int value) {
Object data = DataTypeUtil
.getDataBasedOnDataType(globalDictionary.getDictionaryValueForKey(value), DataTypes.STRING);
if (Objects.isNull(data)) {
builder.appendNull();
} else {
type.writeSlice(builder, wrappedBuffer(
((String) data).getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))));
}
}
}
Expand Up @@ -37,7 +37,6 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
private var dictionaries: Array[Dictionary] = _
private var dataTypes: Array[DataType] = _
private var dictionaryBlock: Array[Block] = _

/**
* This initialization is done inside executor task
Expand All @@ -50,7 +49,6 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {

dictionaries = new Array[Dictionary](carbonColumns.length)
dataTypes = new Array[DataType](carbonColumns.length)
dictionaryBlock = new Array[Block](carbonColumns.length)

carbonColumns.zipWithIndex.foreach {
case (carbonColumn, index) => if (carbonColumn.hasEncoding(Encoding.DICTIONARY) &&
Expand All @@ -66,13 +64,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
dictionaries(index) = forwardDictionaryCache
.get(new DictionaryColumnUniqueIdentifier(carbonTable.getAbsoluteTableIdentifier,
carbonColumn.getColumnIdentifier, dataTypes(index), dictionaryPath))
// in case of string data type create dictionarySliceArray same as that of presto code
if (dataTypes(index).equals(DataTypes.STRING)) {
dictionaryBlock(index) = createDictionaryBlock(dictionaries(index))
}
}

else {
} else {
dataTypes(index) = carbonColumn.getDataType
}
}
Expand All @@ -87,7 +79,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
*/
private def createDictionaryBlock(dictionaryData: Dictionary): Block = {
val chunks: DictionaryChunksWrapper = dictionaryData.getDictionaryChunks
val positionCount = chunks.getSize;
val positionCount = chunks.getSize

// In dictionary there will be only one null and the key value will be 1 by default in carbon,
// hence the isNullVector will be populated only once with null value it has no bearing on
Expand Down Expand Up @@ -127,16 +119,6 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
throw new RuntimeException("UnSupported Method")
}

/**
* Function to get the SliceArrayBlock with dictionary Data
*
* @param columnNo
* @return
*/
def getDictionaryBlock(columnNo: Int): Block = {
dictionaryBlock(columnNo)
}

def getDictionaries: Array[Dictionary] = {
dictionaries
}
Expand Down
Expand Up @@ -239,6 +239,7 @@ object CarbonDataStoreCreator {
bonus.setDataType(DataTypes.createDecimalType(10, 4))
bonus.setPrecision(10)
bonus.setScale(4)
bonus.setEncodingList(dictionaryEncoding)
bonus.setEncodingList(invertedIndexEncoding)
bonus.setColumnUniqueId(UUID.randomUUID().toString)
bonus.setDimensionColumn(false)
Expand Down

0 comments on commit 8e6def9

Please sign in to comment.