Skip to content

Commit

Permalink
Merge 986f73b into 188e7e4
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat authored Jun 28, 2019
2 parents 188e7e4 + 986f73b commit fb48b33
Show file tree
Hide file tree
Showing 11 changed files with 643 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,47 @@ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] d
int columnValueSize = dimensionDataChunkStore.getColumnValueSize();
int rowsNum = dataLength / columnValueSize;
CarbonColumnVector vector = vectorInfo.vector;
if (!dictionary.isDictionaryUsed()) {
vector.setDictionary(dictionary);
dictionary.setDictionaryUsed();
}
BitSet nullBitset = new BitSet();
CarbonColumnVector dictionaryVector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector.getDictionaryVector(), invertedIndex, nullBitset,
vectorInfo.deletedRows, false, true);
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, invertedIndex, nullBitset, vectorInfo.deletedRows,
false, false);
for (int i = 0; i < rowsNum; i++) {
int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
vector.putNull(i);
dictionaryVector.putNull(i);
} else {
dictionaryVector.putInt(i, surrogate);
if (null == vector.getDictionaryVector()) {
// arrow vector case. Decode the dictionary and write to vector
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, invertedIndex, nullBitset, vectorInfo.deletedRows,
false, false);
for (int i = 0; i < rowsNum; i++) {
int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
vector.putNull(i);
vector.putNull(i);
} else {
vector.putByteArray(i, dictionary.getDictionaryValue(surrogate));
}
}
if (vector instanceof ConvertableVector) {
((ConvertableVector) vector).convert();
}
} else {
if (!dictionary.isDictionaryUsed()) {
vector.setDictionary(dictionary);
dictionary.setDictionaryUsed();
}
CarbonColumnVector dictionaryVector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector.getDictionaryVector(), invertedIndex, nullBitset,
vectorInfo.deletedRows, false, true);
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, invertedIndex, nullBitset, vectorInfo.deletedRows,
false, false);
for (int i = 0; i < rowsNum; i++) {
int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
vector.putNull(i);
dictionaryVector.putNull(i);
} else {
dictionaryVector.putInt(i, surrogate);
}
}
if (dictionaryVector instanceof ConvertableVector) {
((ConvertableVector) dictionaryVector).convert();
}
}
if (dictionaryVector instanceof ConvertableVector) {
((ConvertableVector) dictionaryVector).convert();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.sdk.file.arrow.ArrowConverter;

import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.hadoop.mapreduce.RecordReader;
Expand Down Expand Up @@ -51,11 +50,11 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
* @throws Exception
*/
public byte[] readArrowBatch(Schema carbonSchema) throws Exception {
ArrowConverter arrowConverter = new ArrowConverter(carbonSchema, 0);
while (hasNext()) {
arrowConverter.addToArrowBuffer(readNextBatchRow());
getCurrentReader().getCurrentValue();
}
return arrowConverter.toSerializeArray();
return ((CarbonArrowVectorizedRecordReader) getCurrentReader()).getArrowConverter()
.toSerializeArray();
}

/**
Expand All @@ -69,11 +68,11 @@ public byte[] readArrowBatch(Schema carbonSchema) throws Exception {
* @throws Exception
*/
public VectorSchemaRoot readArrowVectors(Schema carbonSchema) throws Exception {
ArrowConverter arrowConverter = new ArrowConverter(carbonSchema, 0);
while (hasNext()) {
arrowConverter.addToArrowBuffer(readNextBatchRow());
getCurrentReader().getCurrentValue();
}
return arrowConverter.getArrowVectors();
return ((CarbonArrowVectorizedRecordReader) getCurrentReader()).getArrowConverter()
.getArrowVectors();
}

/**
Expand All @@ -89,11 +88,11 @@ public VectorSchemaRoot readArrowVectors(Schema carbonSchema) throws Exception {
* @throws Exception
*/
public long readArrowBatchAddress(Schema carbonSchema) throws Exception {
ArrowConverter arrowConverter = new ArrowConverter(carbonSchema, 0);
while (hasNext()) {
arrowConverter.addToArrowBuffer(readNextBatchRow());
getCurrentReader().getCurrentValue();
}
return arrowConverter.copySerializeArrayToOffHeap();
return ((CarbonArrowVectorizedRecordReader) getCurrentReader()).getArrowConverter()
.copySerializeArrayToOffHeap();
}

/**
Expand Down
Loading

0 comments on commit fb48b33

Please sign in to comment.