Skip to content

Commit

Permalink
Fixed data mismatch for large skewed data
Browse files Browse the repository at this point in the history
  • Loading branch information
manishnalla1994 committed May 6, 2019
1 parent 2da1899 commit 130965e
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 16 deletions.
Expand Up @@ -235,7 +235,7 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef

test("Test compaction for range_column - INT Datatype with null values") {
deleteFile(filePath3)
createFile(filePath3, 1000, 3)
createFile(filePath3, 2000, 3)
sql("DROP TABLE IF EXISTS carbon_range_column1")
sql(
"""
Expand All @@ -251,6 +251,12 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
"OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")

sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
"OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")

sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
"OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")

var res = sql("select * from carbon_range_column1").collect()

sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
Expand Down Expand Up @@ -648,7 +654,7 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
}

test("Test compaction for range_column - TIMESTAMP Datatype") {
test("Test compaction for range_column - TIMESTAMP Datatype skewed data") {
deleteFile(filePath2)
createFile(filePath2, 12, 1)
sql("DROP TABLE IF EXISTS carbon_range_column1")
Expand All @@ -658,7 +664,7 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
"""
| CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age TIMESTAMP)
| STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='age, city' ,
| TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='city' ,
| 'range_column'='age')
""".stripMargin)

Expand Down Expand Up @@ -852,9 +858,13 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
} else if (1 == lastCol) {
// Timestamp data generation
for (i <- start until (start + line)) {
write
.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i) + "-10-10 " +
"00:00:00")
if (i == start) {
write
.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i) + "-10-10 " +
"00:00:00")
} else {
write.println(i + "," + "n" + i + "," + "c" + (i % 10000) + ",")
}
}
} else if (2 == lastCol) {
// Float data generation
Expand All @@ -865,8 +875,13 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
} else if (3 == lastCol) {
// Null data generation
for (i <- start until (start + line)) {
write
.println(i + "," + "," + "c" + (i % 10000) + "," + (1990 + i))
if (i % 3 != 0) {
write
.println(i + "," + "," + "c" + (i % 10000) + "," + (1990 + i))
} else {
write
.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
}
}
} else if (4 <= lastCol && 6 >= lastCol) {
// No overlap data generation 1
Expand Down
Expand Up @@ -53,7 +53,7 @@ import org.apache.carbondata.core.scan.expression
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
Expand All @@ -63,7 +63,7 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.load.{DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
import org.apache.carbondata.spark.load.{ByteArrayOrdering, DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}

class CarbonMergerRDD[K, V](
Expand Down Expand Up @@ -395,12 +395,21 @@ class CarbonMergerRDD[K, V](
val numOfPartitions = Math
.max(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL.toInt, DataLoadProcessBuilderOnSpark
.getNumPatitionsBasedOnSize(totalSize, carbonTable, carbonLoadModel))
LOGGER.info(s"Compacting on range column: $rangeColumn")
val colName = rangeColumn.getColName
LOGGER.info(s"Compacting on range column: $colName")
allRanges = getRangesFromRDD(rangeColumn,
carbonTable,
numOfPartitions,
allSplits,
dataType)
// If RangePartitioner does not give ranges in the case when the data is skewed with
// a lot of null records then we take the min/max from footer and set them for tasks
if (null == allRanges || (allRanges.size == 1 && allRanges(0) == null)) {
allRanges = CarbonCompactionUtil.getOverallMinMax(carbonInputSplits.toList.toArray,
rangeColumn,
isRangeColSortCol)
}
LOGGER.info(s"Number of ranges:" + allRanges.size)
}

// prepare the details required to extract the segment properties using last segment.
Expand Down Expand Up @@ -594,18 +603,39 @@ class CarbonMergerRDD[K, V](
inputMetricsStats,
partitionNames = null,
splits = allSplits)
val objectOrdering: Ordering[Object] = DataLoadProcessBuilderOnSpark
.createOrderingForColumn(rangeColumn, true)
val objectOrdering: Ordering[Object] = createOrderingForColumn(rangeColumn, true)
val sparkDataType = Util.convertCarbonToSparkDataType(dataType)
// Change string type to support all types
val sampleRdd = scanRdd
.map(row => (row.get(0, sparkDataType), null))
val sortedRdd = sampleRdd.sortBy(_._1, true)(objectOrdering, classTag[AnyRef])
val value = new DataSkewRangePartitioner(
defaultParallelism, sortedRdd, true)(objectOrdering, classTag[Object])
defaultParallelism, sampleRdd, true)(objectOrdering, classTag[Object])
value.rangeBounds
}

private def createOrderingForColumn(column: CarbonColumn,
mergerRDDFlag: Boolean): Ordering[Object] = {
if (column.isDimension) {
val dimension = column.asInstanceOf[CarbonDimension]
if ((dimension.isGlobalDictionaryEncoding || dimension.isDirectDictionaryEncoding) &&
dimension.getDataType != DataTypes.TIMESTAMP) {
new PrimtiveOrdering(DataTypes.INT)
} else {
if (DataTypeUtil.isPrimitiveColumn(column.getDataType)) {
new PrimtiveOrdering(column.getDataType)
} else {
if (mergerRDDFlag) {
new StringOrdering()
} else {
new ByteArrayOrdering()
}
}
}
} else {
new PrimtiveOrdering(column.getDataType)
}
}

private def getTaskNo(
split: CarbonInputSplit,
partitionTaskMap: util.Map[PartitionSpec, String],
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
Expand All @@ -46,8 +47,11 @@
import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
import org.apache.carbondata.core.scan.expression.logical.AndExpression;
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -494,7 +498,6 @@ public static Expression getFilterExpressionForRange(CarbonColumn rangeColumn, O
new LiteralExpression(maxVal, dataType));
if (rangeColumn.hasEncoding(Encoding.DICTIONARY)) {
exp2.setAlreadyResolved(true);
exp1.setAlreadyResolved(true);
}
finalExpr = new OrExpression(exp1, exp2);
}
Expand All @@ -520,6 +523,84 @@ public static Expression getFilterExpressionForRange(CarbonColumn rangeColumn, O
return finalExpr;
}

public static Object[] getOverallMinMax(CarbonInputSplit[] carbonInputSplits,
CarbonColumn rangeCol, boolean isSortCol) {
byte[] minVal = null;
byte[] maxVal = null;
int dictMinVal = Integer.MAX_VALUE;
int dictMaxVal = Integer.MIN_VALUE;
int idx = -1;
DataType dataType = rangeCol.getDataType();
Object[] minMaxVals = new Object[2];
boolean isDictEncode = rangeCol.hasEncoding(Encoding.DICTIONARY);
try {
for (CarbonInputSplit split : carbonInputSplits) {
DataFileFooter dataFileFooter = null;
dataFileFooter =
CarbonUtil.readMetadataFile(CarbonInputSplit.getTableBlockInfo(split), true);

if (-1 == idx) {
List<ColumnSchema> allColumns = dataFileFooter.getColumnInTable();
for (int i = 0; i < allColumns.size(); i++) {
if (allColumns.get(i).getColumnName().equalsIgnoreCase(rangeCol.getColName())) {
idx = i;
break;
}
}
}
if (isDictEncode) {
byte[] tempMin = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMinValues()[idx];
int tempMinVal = CarbonUtil.getSurrogateInternal(tempMin, 0, tempMin.length);
byte[] tempMax = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMaxValues()[idx];
int tempMaxVal = CarbonUtil.getSurrogateInternal(tempMax, 0, tempMax.length);
if (dictMinVal > tempMinVal) {
dictMinVal = tempMinVal;
}
if (dictMaxVal < tempMaxVal) {
dictMaxVal = tempMaxVal;
}
} else {
if (null == minVal) {
minVal = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMinValues()[idx];
maxVal = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMaxValues()[idx];
} else {
byte[] tempMin = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMinValues()[idx];
byte[] tempMax = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMaxValues()[idx];
if (ByteUtil.compare(tempMin, minVal) <= 0) {
minVal = tempMin;
}
if (ByteUtil.compare(tempMax, maxVal) >= 0) {
maxVal = tempMax;
}
}
}
}

// Based on how min/max value is stored in the footer we change the data
if (isDictEncode) {
minMaxVals[0] = dictMinVal;
minMaxVals[1] = dictMaxVal;
} else {
if (!isSortCol && (dataType == DataTypes.INT || dataType == DataTypes.LONG)) {
minMaxVals[0] = ByteUtil.toLong(minVal, 0, minVal.length);
minMaxVals[1] = ByteUtil.toLong(maxVal, 0, maxVal.length);
} else if (dataType == DataTypes.DOUBLE) {
minMaxVals[0] = ByteUtil.toDouble(minVal, 0, minVal.length);
minMaxVals[1] = ByteUtil.toDouble(maxVal, 0, maxVal.length);
} else {
minMaxVals[0] =
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minVal, dataType, true);
minMaxVals[1] =
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxVal, dataType, true);
}
}

} catch (IOException e) {
LOGGER.error(e.getMessage());
}
return minMaxVals;
}

/**
* Returns if the DataFileFooter containing carbondata file contains
* sorted data or not.
Expand Down

0 comments on commit 130965e

Please sign in to comment.