Skip to content

Commit

Permalink
Merge e29eaac into 157de1d
Browse files Browse the repository at this point in the history
  • Loading branch information
manishnalla1994 committed May 8, 2019
2 parents 157de1d + e29eaac commit cbab084
Showing 1 changed file with 109 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ class CarbonMergerRDD[K, V](
}
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var allRanges: Array[Object] = new Array[Object](0)
var singleRange = false
if (rangeColumn != null) {
// To calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
val numOfPartitions = Math
Expand All @@ -400,10 +401,14 @@ class CarbonMergerRDD[K, V](
dataType)
// If RangePartitioner does not give ranges in the case when the data is skewed with
// a lot of null records then we take the min/max from footer and set them for tasks
if (null == allRanges || (allRanges.size == 1 && allRanges(0) == null)) {
if (null == allRanges || allRanges.size == 1) {
allRanges = CarbonCompactionUtil.getOverallMinMax(carbonInputSplits.toList.toArray,
rangeColumn,
isRangeColSortCol)
if(allRanges(0) == allRanges(1)) {
// This will be true only if data has single values throughout
singleRange = true
}
}
LOGGER.info(s"Number of ranges:" + allRanges.size)
}
Expand Down Expand Up @@ -433,75 +438,118 @@ class CarbonMergerRDD[K, V](
val newRanges = allRanges.filter { range =>
range != null
}
carbonInputSplits.foreach { split =>
var dataFileFooter: DataFileFooter = null
if (null == rangeColumn) {
val taskNo = getTaskNo(split, partitionTaskMap, counter)
var sizeOfSplit = split.getDetailInfo.getBlockSize
val splitList = taskIdMapping.get(taskNo)
noOfBlocks += 1
val noOfSplitsPerTask = Math.ceil(carbonInputSplits.size / defaultParallelism)
var taskCount = 0
// In case of range column if only one data value is present then we try to
// divide the splits to different tasks in order to avoid single task creation
// and load on single executor
if (singleRange) {
var filterExpr = CarbonCompactionUtil
.getFilterExpressionForRange(rangeColumn,
null, allRanges(0), dataType)
if (null == expressionMapForRangeCol) {
expressionMapForRangeCol = new util.HashMap[Integer, Expression]()
}
expressionMapForRangeCol.put(taskCount, filterExpr)
carbonInputSplits.foreach { split =>
var dataFileFooter: DataFileFooter = null
try {
dataFileFooter = CarbonUtil.readMetadataFile(
CarbonInputSplit.getTableBlockInfo(split))
} catch {
case e: IOException =>
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
// add all the column and cardinality to the map
CarbonCompactionUtil
.addColumnCardinalityToMap(columnToCardinalityMap,
dataFileFooter.getColumnInTable,
dataFileFooter.getSegmentInfo.getColumnCardinality)

var splitList = taskIdMapping.get(taskCount.toString)
if (null != splitList && splitList.size == noOfSplitsPerTask) {
taskCount = taskCount + 1
expressionMapForRangeCol.put(taskCount, filterExpr)
splitList = taskIdMapping.get(taskCount.toString)
}
if (null == splitList) {
val splitTempList = new util.ArrayList[CarbonInputSplit]()
splitTempList.add(split)
taskIdMapping.put(taskNo, splitTempList)
} else {
splitList.add(split)
splitList = new util.ArrayList[CarbonInputSplit]()
taskIdMapping.put(taskCount.toString, splitList)
}
splitList.add(split)
}
// Check the cardinality of each columns and set the highest.
try {
dataFileFooter = CarbonUtil.readMetadataFile(
CarbonInputSplit.getTableBlockInfo(split))
} catch {
case e: IOException =>
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
// add all the column and cardinality to the map
CarbonCompactionUtil
.addColumnCardinalityToMap(columnToCardinalityMap,
dataFileFooter.getColumnInTable,
dataFileFooter.getSegmentInfo.getColumnCardinality)

// Create taskIdMapping here for range column by reading min/max values.
if (null != rangeColumn) {
if (null == expressionMapForRangeCol) {
expressionMapForRangeCol = new util.HashMap[Integer, Expression]()
} else {
carbonInputSplits.foreach { split =>
var dataFileFooter: DataFileFooter = null
if (null == rangeColumn) {
val taskNo = getTaskNo(split, partitionTaskMap, counter)
var sizeOfSplit = split.getDetailInfo.getBlockSize
val splitList = taskIdMapping.get(taskNo)
noOfBlocks += 1
if (null == splitList) {
val splitTempList = new util.ArrayList[CarbonInputSplit]()
splitTempList.add(split)
taskIdMapping.put(taskNo, splitTempList)
} else {
splitList.add(split)
}
}
// Check the cardinality of each columns and set the highest.
try {
dataFileFooter = CarbonUtil.readMetadataFile(
CarbonInputSplit.getTableBlockInfo(split))
} catch {
case e: IOException =>
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
if (-1 == indexOfRangeColumn) {
val allColumns = dataFileFooter.getColumnInTable
for (i <- 0 until allColumns.size()) {
if (allColumns.get(i).getColumnName.equalsIgnoreCase(rangeColumn.getColName)) {
indexOfRangeColumn = i
// add all the column and cardinality to the map
CarbonCompactionUtil
.addColumnCardinalityToMap(columnToCardinalityMap,
dataFileFooter.getColumnInTable,
dataFileFooter.getSegmentInfo.getColumnCardinality)

// Create taskIdMapping here for range column by reading min/max values.
if (null != rangeColumn) {
if (null == expressionMapForRangeCol) {
expressionMapForRangeCol = new util.HashMap[Integer, Expression]()
}
if (-1 == indexOfRangeColumn) {
val allColumns = dataFileFooter.getColumnInTable
for (i <- 0 until allColumns.size()) {
if (allColumns.get(i).getColumnName.equalsIgnoreCase(rangeColumn.getColName)) {
indexOfRangeColumn = i
}
}
}
}
// Create ranges and add splits to the tasks
for (i <- 0 until (newRanges.size + 1)) {
if (null == expressionMapForRangeCol.get(i)) {
// Creating FilterExpression for the range column
var minVal: Object = null
var maxVal: Object = null
// For first task we will create an Or Filter and also accomodate null values
// For last task we will take as GreaterThan Expression of last value
if (i != 0) {
minVal = newRanges(i - 1)
// Create ranges and add splits to the tasks
for (i <- 0 until (newRanges.size + 1)) {
if (null == expressionMapForRangeCol.get(i)) {
// Creating FilterExpression for the range column
var minVal: Object = null
var maxVal: Object = null
// For first task we will create an Or Filter and also accomodate null values
// For last task we will take as GreaterThan Expression of last value
if (i != 0) {
minVal = newRanges(i - 1)
}
if (i != newRanges.size) {
maxVal = newRanges(i)
}
val filterExpr = CarbonCompactionUtil
.getFilterExpressionForRange(rangeColumn,
minVal, maxVal, dataType)
expressionMapForRangeCol.put(i, filterExpr)
}
if (i != newRanges.size) {
maxVal = newRanges(i)
var splitList = taskIdMapping.get(i.toString)
noOfBlocks += 1
if (null == splitList) {
splitList = new util.ArrayList[CarbonInputSplit]()
taskIdMapping.put(i.toString, splitList)
}
val filterExpr = CarbonCompactionUtil
.getFilterExpressionForRange(rangeColumn,
minVal, maxVal, dataType)
expressionMapForRangeCol.put(i, filterExpr)
}
var splitList = taskIdMapping.get(i.toString)
noOfBlocks += 1
if (null == splitList) {
splitList = new util.ArrayList[CarbonInputSplit]()
taskIdMapping.put(i.toString, splitList)
splitList.add(split)
}
splitList.add(split)
}
}
}
Expand Down

0 comments on commit cbab084

Please sign in to comment.