Skip to content
Permalink
Browse files
[CARBONDATA-4194] Fixed presto read after update/delete from spark
Why is this PR needed?
After update/delete with spark on the table which contains array/struct column,
when we are trying to read from presto then it is throwing class cast exception.
It is because when we perform update/delete then it contains vector of type
ColumnarVectorWrapperDirectWithDeleteDelta which we are trying to typecast to
CarbonColumnVectorImpl and because of this it is throwing typecast exception.
After fixing this(added check for instanceOf) it started throwing IllegalArgumentException.
It is because:

1. In case of local dictionary enable CarbondataPageSource.load is calling
ComplexTypeStreamReader.putComplexObject before setting the correct number
of rows(doesn't subtrat deleted rows). And it throws IllegalArgument while
block building for child elements.
2. position count is wrong in the case of the struct. It should subtract
the number of deleted rows in LocalDictDimensionDataChunkStore.fillVector.
While this is not required to be changed in the case of the array because
datalength of the array already taking care of deleted rows in
ColumnVectorInfo.getUpdatedPageSizeForChildVector.

What changes were proposed in this PR?
First fixed class cast exception after putting instanceOf condition in if block.
Then subtracted the deleted row count before calling ComplexTypeStreamReader.putComplexObject
in DirectCompressCodec.decodeAndFillVector. Also handle deleted rows in case of struct
in LocalDictDimensionDataChunkStore.fillVector

Does this PR introduce any user interface change?
No

Is any new testcase added?
No

This Closes #4224
  • Loading branch information
nihal0107 authored and akashrn5 committed Oct 28, 2021
1 parent 7d94691 commit 07b41a5382f554646f231e192cf39c8f28302a05
Showing 3 changed files with 32 additions and 18 deletions.
@@ -22,6 +22,7 @@

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
@@ -67,8 +68,16 @@ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] d
int rowsNum = dataLength / columnValueSize;
CarbonColumnVector vector = vectorInfo.vector;
if (vector.getType().isComplexType()) {
if (DataTypes.isStructType(vector.getType())) {
int deletedRow = vectorInfo.deletedRows != null ? vectorInfo.deletedRows.cardinality() : 0;
rowsNum = dataLength - deletedRow;
} else {
// this is not required to be changed in the case of the array because
// datalength of the array already taking care of deleted rows in
// ColumnVectorInfo.getUpdatedPageSizeForChildVector
rowsNum = dataLength;
}
vector = vectorInfo.vectorStack.peek();
rowsNum = dataLength;
CarbonColumnVector sliceVector = vector.getColumnVector();
// use rowsNum as positionCount in order to create dictionary block
sliceVector.setPositionCount(rowsNum);
@@ -87,6 +96,7 @@ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] d
vectorInfo.deletedRows, false, false);
// this check is in case of array of string type
if (vectorInfo.vector.getType().isComplexType()
&& dictionaryVector instanceof CarbonColumnVectorImpl
&& ((CarbonColumnVectorImpl) dictionaryVector).getIntArraySize() < rowsNum) {
((CarbonColumnVectorImpl) dictionaryVector).increaseIntArraySize(rowsNum);
}
@@ -262,12 +262,16 @@ public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, Bi
CarbonColumnVector parentVector = vectorInfo.vectorStack.peek();
CarbonColumnVectorImpl parentVectorImpl =
(CarbonColumnVectorImpl) (parentVector.getColumnVector());
int deletedRowCount = vectorInfo.deletedRows != null ?
vectorInfo.deletedRows.cardinality() : 0;
// parse the parent page data,
// save the information about number of child in each row in parent vector
if (DataTypes.isStructType(parentVectorImpl.getType())) {
parentVectorImpl.setNumberOfChildElementsForStruct(pageData, pageSize);
parentVectorImpl
.setNumberOfElementsInEachRowForStruct(pageData, pageSize - deletedRowCount);
} else {
parentVectorImpl.setNumberOfChildElementsForArray(pageData, pageSize);
parentVectorImpl
.setNumberOfElementsInEachRowForArray(pageData, pageSize - deletedRowCount);
}
for (CarbonColumnVector childVector : parentVector.getColumnVector().getChildrenVector()) {
// push each child
@@ -79,7 +79,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {

private boolean loaded;

private List<Integer> childElementsForEachRow;
private List<Integer> numberOfChildElementsInEachRow;

private CarbonDictionary localDictionary;

@@ -121,41 +121,41 @@ public void setChildrenVector(List<CarbonColumnVector> childrenVector) {
}

public List<Integer> getNumberOfChildrenElementsInEachRow() {
return childElementsForEachRow;
return numberOfChildElementsInEachRow;
}

public void setNumberOfChildElementsInEachRow(List<Integer> childrenElements) {
this.childElementsForEachRow = childrenElements;
public void setNumberOfChildElementsInEachRow(List<Integer> numberOfChildElementsInEachRow) {
this.numberOfChildElementsInEachRow = numberOfChildElementsInEachRow;
}

public void setNumberOfChildElementsForArray(byte[] parentPageData, int pageSize) {
public void setNumberOfElementsInEachRowForArray(byte[] parentPageData, int pageSize) {
// for complex array type, go through parent page to get the child information
ByteBuffer childInfoBuffer = ByteBuffer.wrap(parentPageData);
List<Integer> childElementsForEachRow = new ArrayList<>();
List<Integer> numberOfArrayElementsInEachRow = new ArrayList<>();
// Parent page array data looks like
// number of children in each row [4 byte], Offset [4 byte],
// number of children in each row [4 byte], Offset [4 byte]...
while (pageSize != childElementsForEachRow.size()) {
// get the number of children in current row
childElementsForEachRow.add(childInfoBuffer.getInt());
while (pageSize != numberOfArrayElementsInEachRow.size()) {
// get the number of array elements in current row
numberOfArrayElementsInEachRow.add(childInfoBuffer.getInt());
// skip offset
childInfoBuffer.getInt();
}
setNumberOfChildElementsInEachRow(childElementsForEachRow);
setNumberOfChildElementsInEachRow(numberOfArrayElementsInEachRow);
}

public void setNumberOfChildElementsForStruct(byte[] parentPageData, int pageSize) {
public void setNumberOfElementsInEachRowForStruct(byte[] parentPageData, int pageSize) {
// for complex struct type, go through parent page to get the child information
ByteBuffer childInfoBuffer = ByteBuffer.wrap(parentPageData);
List<Integer> childElementsForEachRow = new ArrayList<>();
List<Integer> numberOfStructElementsInEachRow = new ArrayList<>();
// Parent page struct data looks like
// number of children in each row [2 byte], number of children in each row [2 byte],
// number of children in each row [2 byte], number of children in each row [2 byte]...
while (pageSize != childElementsForEachRow.size()) {
while (pageSize != numberOfStructElementsInEachRow.size()) {
int elements = childInfoBuffer.getShort();
childElementsForEachRow.add(elements);
numberOfStructElementsInEachRow.add(elements);
}
setNumberOfChildElementsInEachRow(childElementsForEachRow);
setNumberOfChildElementsInEachRow(numberOfStructElementsInEachRow);
}

public CarbonDictionary getLocalDictionary() {

0 comments on commit 07b41a5

Please sign in to comment.