Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-3083] Fixed data mismatch issue after update #2902

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,17 @@ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorIn
DataType pageDataType = columnPage.getDataType();
int pageSize = columnPage.getPageSize();
BitSet deletedRows = vectorInfo.deletedRows;
vector = ColumnarVectorWrapperDirectFactory
CarbonColumnVector underlyingVector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
true, false);
fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
// check if a new columnvector object is created due to invertedIndex or deleted rows
// present in the table. If this is the case then we cannot fill the vector directly we
// need to check the vector for null values(already done by putInt/putFloat instead of
// putInts/putFloats).
boolean isUnderlyingVectorPresent = vector != underlyingVector;
vector = underlyingVector;
fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo,
isUnderlyingVectorPresent);
if (deletedRows == null || deletedRows.isEmpty()) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
Expand All @@ -296,7 +303,8 @@ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorIn
}

private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo,
boolean isUnderlyingVectorPresent) {
if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
byte[] byteData = columnPage.getBytePage();
if (vectorDataType == DataTypes.SHORT) {
Expand All @@ -316,7 +324,13 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
vector.putLong(i, (long) byteData[i] * 1000);
}
} else if (vectorDataType == DataTypes.BOOLEAN) {
vector.putBytes(0, pageSize, byteData, 0);
if (isUnderlyingVectorPresent) {
for (int i = 0; i < pageSize; i++) {
vector.putByte(i, byteData[i]);
}
} else {
vector.putBytes(0, pageSize, byteData, 0);
}
} else if (DataTypes.isDecimal(vectorDataType)) {
DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
Expand All @@ -328,7 +342,13 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
} else if (pageDataType == DataTypes.SHORT) {
short[] shortData = columnPage.getShortPage();
if (vectorDataType == DataTypes.SHORT) {
vector.putShorts(0, pageSize, shortData, 0);
if (isUnderlyingVectorPresent) {
for (int i = 0; i < pageSize; i++) {
vector.putShort(i, shortData[i]);
}
} else {
vector.putShorts(0, pageSize, shortData, 0);
}
} else if (vectorDataType == DataTypes.INT) {
for (int i = 0; i < pageSize; i++) {
vector.putInt(i, (int) shortData[i]);
Expand Down Expand Up @@ -380,7 +400,13 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
} else if (pageDataType == DataTypes.INT) {
int[] intData = columnPage.getIntPage();
if (vectorDataType == DataTypes.INT) {
vector.putInts(0, pageSize, intData, 0);
if (isUnderlyingVectorPresent) {
for (int i = 0; i < pageSize; i++) {
vector.putInt(i, intData[i]);
}
} else {
vector.putInts(0, pageSize, intData, 0);
}
} else if (vectorDataType == DataTypes.LONG) {
for (int i = 0; i < pageSize; i++) {
vector.putLong(i, intData[i]);
Expand All @@ -400,7 +426,13 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
} else if (pageDataType == DataTypes.LONG) {
long[] longData = columnPage.getLongPage();
if (vectorDataType == DataTypes.LONG) {
vector.putLongs(0, pageSize, longData, 0);
if (isUnderlyingVectorPresent) {
for (int i = 0; i < pageSize; i++) {
vector.putLong(i, longData[i]);
}
} else {
vector.putLongs(0, pageSize, longData, 0);
}
} else if (vectorDataType == DataTypes.TIMESTAMP) {
for (int i = 0; i < pageSize; i++) {
vector.putLong(i, longData[i] * 1000);
Expand All @@ -411,7 +443,13 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
}
} else {
double[] doubleData = columnPage.getDoublePage();
vector.putDoubles(0, pageSize, doubleData, 0);
if (isUnderlyingVectorPresent) {
for (int i = 0; i < pageSize; i++) {
vector.putDouble(i, doubleData[i]);
}
} else {
vector.putDoubles(0, pageSize, doubleData, 0);
}
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,17 @@ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorIn
DataType pageDataType = columnPage.getDataType();
int pageSize = columnPage.getPageSize();
BitSet deletedRows = vectorInfo.deletedRows;
vector = ColumnarVectorWrapperDirectFactory
CarbonColumnVector underlyingVector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
true, false);
fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
// check if a new columnvector object is created due to invertedIndex or deleted rows
// present in the table. If this is the case then we cannot fill the vector directly we
// need to check the vector for null values(already done by putInt/putFloat instead of
// putInts/putFloats).
boolean isUnderlyingVectorPresent = vector != underlyingVector;
vector = underlyingVector;
fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo,
isUnderlyingVectorPresent);
if (deletedRows == null || deletedRows.isEmpty()) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
Expand All @@ -225,7 +232,8 @@ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorIn
}

private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo,
boolean isUnderlyingVectorPresent) {
if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
byte[] byteData = columnPage.getBytePage();
if (vectorDataType == DataTypes.SHORT) {
Expand Down Expand Up @@ -257,7 +265,13 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
} else if (pageDataType == DataTypes.SHORT) {
short[] shortData = columnPage.getShortPage();
if (vectorDataType == DataTypes.SHORT) {
vector.putShorts(0, pageSize, shortData, 0);
if (isUnderlyingVectorPresent) {
for (int i = 0; i < pageSize; i++) {
vector.putShort(i, shortData[i]);
}
} else {
vector.putShorts(0, pageSize, shortData, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using putShorts/putFloats is common and unavoidable. In future also any new encoding class can make use of these method and then again the same problem can occur. Is it feasible to modify the vector classes implementation methods itself just like an example below
public void putShorts(int rowId, int count, short[] src, int srcIndex) { for (int i = srcIndex; i < count; i++) { putShort(rowId++, src[i]); } }
This way it will be better

}
} else if (vectorDataType == DataTypes.INT) {
for (int i = 0; i < pageSize; i++) {
vector.putInt(i, (int) shortData[i]);
Expand Down Expand Up @@ -309,7 +323,13 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
} else if (pageDataType == DataTypes.INT) {
int[] intData = columnPage.getIntPage();
if (vectorDataType == DataTypes.INT) {
vector.putInts(0, pageSize, intData, 0);
if (isUnderlyingVectorPresent) {
for (int i = 0; i < pageSize; i++) {
vector.putInt(i, intData[i]);
}
} else {
vector.putInts(0, pageSize, intData, 0);
}
} else if (vectorDataType == DataTypes.LONG) {
for (int i = 0; i < pageSize; i++) {
vector.putLong(i, intData[i]);
Expand All @@ -329,7 +349,13 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
} else if (pageDataType == DataTypes.LONG) {
long[] longData = columnPage.getLongPage();
if (vectorDataType == DataTypes.LONG) {
vector.putLongs(0, pageSize, longData, 0);
if (isUnderlyingVectorPresent) {
for (int i = 0; i < pageSize; i++) {
vector.putLong(i, longData[i]);
}
} else {
vector.putLongs(0, pageSize, longData, 0);
}
} else if (vectorDataType == DataTypes.TIMESTAMP) {
for (int i = 0; i < pageSize; i++) {
vector.putLong(i, longData[i] * 1000);
Expand All @@ -344,10 +370,22 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
columnPage.getNullBits());
} else if (vectorDataType == DataTypes.FLOAT) {
float[] floatPage = columnPage.getFloatPage();
vector.putFloats(0, pageSize, floatPage, 0);
if (isUnderlyingVectorPresent) {
for (int i = 0; i < pageSize; i++) {
vector.putFloat(i, floatPage[i]);
}
} else {
vector.putFloats(0, pageSize, floatPage, 0);
}
} else {
double[] doubleData = columnPage.getDoublePage();
vector.putDoubles(0, pageSize, doubleData, 0);
if (isUnderlyingVectorPresent) {
for (int i = 0; i < pageSize; i++) {
vector.putDouble(i, doubleData[i]);
}
} else {
vector.putDoubles(0, pageSize, doubleData, 0);
}
}
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-30000,aaa,-300
0,ddd,0
-20000,bbb,-200
70000,ggg,700
10000,eee,100,
-10000,ccc,-100,
null,null,null
Original file line number Diff line number Diff line change
Expand Up @@ -772,12 +772,33 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("""drop table if exists iud.dest33_part""")
}

test("check data after update with row.filter pushdown as false") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants
.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR, "false")
sql("""drop table if exists iud.dest33_flat""")
sql(
"""create table iud.dest33_part (c1 int,c2 string, c3 short) STORED BY 'carbondata'"""
.stripMargin)
sql(
s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/negativevalue.csv' INTO table iud
|.dest33_part options('header'='false')""".stripMargin)
sql(
"""update iud.dest33_part d set (c1) = (5) where d.c1 = 0""".stripMargin).show()
checkAnswer(sql("select c3 from iud.dest33_part"), Seq(Row(-300), Row(0), Row(-200), Row(700)
, Row(100), Row(-100), Row(null)))
sql("""drop table if exists iud.dest33_part""")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants
.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR, "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After test case completion we should set the default value for CARBON_PUSH_ROW_FILTERS_FOR_VECTOR?...default property is false so I think at the start of test case no need to modify the property value

}

override def afterAll {
sql("use default")
sql("drop database if exists iud cascade")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants
.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR, "false")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of hard coding "false" use default value from constants

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
val PUSHED_FILTERS = "PushedFilters"

val vectorPushRowFilters = CarbonProperties.getInstance().isPushRowFiltersForVector
def vectorPushRowFilters: Boolean = CarbonProperties.getInstance().isPushRowFiltersForVector

/*
Spark 2.3.1 plan there can be case of multiple projections like below
Expand Down