Skip to content

Commit

Permalink
Refactored code in Driver side for Range Values
Browse files Browse the repository at this point in the history
  • Loading branch information
manishnalla1994 committed May 3, 2019
1 parent d73b0ab commit 2da1899
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 205 deletions.

This file was deleted.

Expand Up @@ -579,12 +579,12 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
sql("DROP TABLE IF EXISTS carbon_range_column1")
}

test("Test compaction for range_column - STRING Datatype minmax not stored") {
test("Test compaction for range_column - STRING Datatype min/max not stored") {
deleteFile(filePath2)
createFile(filePath2, 100000, 7)
createFile(filePath2, 1000, 7)
sql("DROP TABLE IF EXISTS carbon_range_column1")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT, "10")
sql("DROP TABLE IF EXISTS carbon_range_column1")
sql(
"""
| CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age LONG)
Expand All @@ -610,6 +610,7 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT,
CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT_DEFAULT)
deleteFile(filePath2)
}

test("Test compaction for range_column - DATE Datatype") {
Expand Down Expand Up @@ -783,9 +784,8 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
): Unit = {
val boundsBuffer = new ArrayBuffer[Object]()
bounds.map(_.getBytes()).foreach(boundsBuffer += _)
val minMax: Array[Object] = Array()
val (_, actualSkewCount, actualSkewIndexes, actualSkewWeights, newMinMax) =
partitioner.combineDataSkew(boundsBuffer, minMax)
val (_, actualSkewCount, actualSkewIndexes, actualSkewWeights) =
partitioner.combineDataSkew(boundsBuffer)
assertResult(skewCount)(actualSkewCount)
if (skewCount > 0) {
assertResult(skewIndexes)(actualSkewIndexes)
Expand Down
Expand Up @@ -40,23 +40,20 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants, SortScopeOptions}
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.RangeValues
import org.apache.carbondata.core.datastore.block._
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
import org.apache.carbondata.core.mutate.UpdateVO
import org.apache.carbondata.core.scan.expression
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
Expand Down Expand Up @@ -392,12 +389,8 @@ class CarbonMergerRDD[K, V](
}

val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var allRanges: Array[RangeValues] = new Array[RangeValues](0)
var allRanges: Array[Object] = new Array[Object](0)
if (rangeColumn != null) {
// Get the overall min/max for all segments
var (minVal, maxVal): (Object, Object) = getOverallMinMax(carbonInputSplits,
rangeColumn,
isRangeColSortCol)
// To calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
val numOfPartitions = Math
.max(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL.toInt, DataLoadProcessBuilderOnSpark
Expand All @@ -407,9 +400,7 @@ class CarbonMergerRDD[K, V](
carbonTable,
numOfPartitions,
allSplits,
dataType,
minVal,
maxVal)
dataType)
}

// prepare the details required to extract the segment properties using last segment.
Expand Down Expand Up @@ -476,12 +467,22 @@ class CarbonMergerRDD[K, V](
}
}
// Create ranges and add splits to the tasks
for (i <- 0 until allRanges.size) {
for (i <- 0 until (allRanges.size + 1)) {
if (null == expressionMapForRangeCol.get(i)) {
// Creating FilterExpression for the range column
var minVal: Object = null
var maxVal: Object = null
// For first task we will create an Or Filter and also accomodate null values
// For last task we will take as GreaterThan Expression of last value
if (i != 0) {
minVal = allRanges(i - 1)
}
if (i != allRanges.size) {
maxVal = allRanges(i)
}
val filterExpr = CarbonCompactionUtil
.getFilterExpressionForRange(rangeColumn, i,
allRanges(i).getMinVal, allRanges(i).getMaxVal, dataType)
.getFilterExpressionForRange(rangeColumn,
minVal, maxVal, dataType)
expressionMapForRangeCol.put(i, filterExpr)
}
var splitList = taskIdMapping.get(i.toString)
Expand Down Expand Up @@ -579,9 +580,7 @@ class CarbonMergerRDD[K, V](
carbonTable: CarbonTable,
defaultParallelism: Int,
allSplits: java.util.ArrayList[InputSplit],
dataType: DataType,
minVal: Object,
maxVal: Object): Array[RangeValues] = {
dataType: DataType): Array[Object] = {
val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
val projection = new CarbonProjection
projection.addColumn(rangeColumn.getColName)
Expand All @@ -604,88 +603,7 @@ class CarbonMergerRDD[K, V](
val sortedRdd = sampleRdd.sortBy(_._1, true)(objectOrdering, classTag[AnyRef])
val value = new DataSkewRangePartitioner(
defaultParallelism, sortedRdd, true)(objectOrdering, classTag[Object])
if(minVal == null && maxVal == null) {
CarbonCompactionUtil
.getRangesFromVals(value.rangeBounds, value.rangeBounds(0), value.rangeBounds(1))
} else {
CarbonCompactionUtil.getRangesFromVals(value.rangeBounds, minVal, maxVal)
}
}

def getOverallMinMax(carbonInputSplits: mutable.Seq[CarbonInputSplit],
rangeCol: CarbonColumn, isSortCol: Boolean): (Object, Object) = {
var minVal: Array[Byte] = null
var maxVal: Array[Byte] = null
var dictMinVal: Int = Integer.MAX_VALUE
var dictMaxVal: Int = Integer.MIN_VALUE
var idx: Int = -1
val dataType = rangeCol.getDataType
val isDictEncode = rangeCol.hasEncoding(Encoding.DICTIONARY) &&
!rangeCol.hasEncoding(Encoding.DIRECT_DICTIONARY)
carbonInputSplits.foreach { split =>
var dataFileFooter: DataFileFooter = null
dataFileFooter = CarbonUtil.readMetadataFile(
CarbonInputSplit.getTableBlockInfo(split), true)
if (-1 == idx) {
val allColumns = dataFileFooter.getColumnInTable
for (i <- 0 until allColumns.size()) {
if (allColumns.get(i).getColumnName.equalsIgnoreCase(rangeCol.getColName)) {
idx = i
}
}
}
if(isDictEncode) {
var tempMin = dataFileFooter.getBlockletIndex.getMinMaxIndex.getMinValues.apply(idx)
var tempMinVal = CarbonUtil.getSurrogateInternal(tempMin, 0, tempMin.length)
var tempMax = dataFileFooter.getBlockletIndex.getMinMaxIndex.getMaxValues.apply(idx)
var tempMaxVal = CarbonUtil.getSurrogateInternal(tempMax, 0, tempMax.length)
if (dictMinVal > tempMinVal) {
dictMinVal = tempMinVal
}
if (dictMaxVal < tempMaxVal) {
dictMaxVal = tempMaxVal
}
} else {
if (null == minVal) {
minVal = dataFileFooter.getBlockletIndex.getMinMaxIndex.getMinValues.apply(idx)
maxVal = dataFileFooter.getBlockletIndex.getMinMaxIndex.getMaxValues.apply(idx)
} else {
val tempMin = dataFileFooter.getBlockletIndex.getMinMaxIndex.getMinValues.apply(idx)
val tempMax = dataFileFooter.getBlockletIndex.getMinMaxIndex.getMaxValues.apply(idx)
if (ByteUtil.compare(tempMin, minVal) <= 0) {
minVal = tempMin
}
if (ByteUtil.compare(tempMax, maxVal) >= 0) {
maxVal = tempMax
}
}
}
}
// Based on how min/max value is stored in the footer we change the data

if (isDictEncode) {
(dictMinVal.asInstanceOf[AnyRef], dictMaxVal.asInstanceOf[AnyRef])
} else {
if (!isSortCol && (dataType == DataTypes.INT || dataType == DataTypes.LONG)) {
(ByteUtil.toLong(minVal, 0, minVal.length).asInstanceOf[AnyRef],
ByteUtil.toLong(maxVal, 0, maxVal.length).asInstanceOf[AnyRef])
} else if (dataType == DataTypes.DATE) {
(new DateDirectDictionaryGenerator(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT)).
getValueFromSurrogate(ByteUtil.toInt(minVal, 0, minVal.length)),
new DateDirectDictionaryGenerator(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT)).
getValueFromSurrogate(ByteUtil.toInt(maxVal, 0, maxVal.length)))
} else if (dataType == DataTypes.DOUBLE) {
(ByteUtil.toDouble(minVal, 0, minVal.length).asInstanceOf[AnyRef],
ByteUtil.toDouble(maxVal, 0, maxVal.length).asInstanceOf[AnyRef])
} else {
(DataTypeUtil.
getDataBasedOnDataTypeForNoDictionaryColumn(minVal, dataType, true),
DataTypeUtil.
getDataBasedOnDataTypeForNoDictionaryColumn(maxVal, dataType, true))
}
}
value.rangeBounds
}

private def getTaskNo(
Expand Down
Expand Up @@ -95,9 +95,9 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
// dataSkewNum: how many partition of each data skew bound
// Min and Max values of complete range
var (rangeBounds: Array[K], skewCount: Int, skewIndexes: Array[Int],
skewWeights: Array[Int], minMaxVals: Array[K]) = {
skewWeights: Array[Int]) = {
if (partitions <= 1) {
(Array.empty[K], 0, Array.empty[Int], Array.empty[Int], Array.empty[K])
(Array.empty[K], 0, Array.empty[Int], Array.empty[Int])
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
Expand Down Expand Up @@ -140,7 +140,7 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
var ranges = RangePartitioner.determineBounds(candidates, partitions)
var otherRangeParams = determineBounds(candidates, partitions, true)
(ranges, otherRangeParams._2, otherRangeParams._3,
otherRangeParams._4, otherRangeParams._5)
otherRangeParams._4)
}
}
}
Expand All @@ -149,50 +149,40 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
def determineBounds(
candidates: ArrayBuffer[(K, Float)],
partitions: Int,
withoutSkew: Boolean): (Array[K], Int, Array[Int], Array[Int], Array[K]) = {
withoutSkew: Boolean): (Array[K], Int, Array[Int], Array[Int]) = {
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
var minMax = new Array[K](2)
if (numCandidates > 0) {
minMax(0) = ordered(0)._1
minMax(1) = ordered(numCandidates - 1)._1
}
if (withoutSkew) {
(Array.empty[K], 0, Array.empty[Int], Array.empty[Int], minMax)
} else {
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gteq(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gteq(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
i += 1
}
i += 1
}

if (bounds.size >= 2) {
combineDataSkew(bounds, minMax)
} else {
(bounds.toArray, 0, Array.empty[Int], Array.empty[Int], minMax)
}
if (bounds.size >= 2) {
combineDataSkew(bounds)
} else {
(bounds.toArray, 0, Array.empty[Int], Array.empty[Int])
}
}

def combineDataSkew(bounds: ArrayBuffer[K],
minMax: Array[K]): (Array[K], Int, Array[Int], Array[Int], Array[K]) = {
def combineDataSkew(bounds: ArrayBuffer[K]): (Array[K], Int, Array[Int], Array[Int]) = {
val finalBounds = ArrayBuffer.empty[K]
var preBound = bounds(0)
finalBounds += preBound
Expand Down Expand Up @@ -220,9 +210,9 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
}
if (dataSkewIndexTmp.size > 0) {
(finalBounds.toArray, dataSkewIndexTmp.size, dataSkewIndexTmp.toArray, dataSkewNumTmp
.toArray, minMax)
.toArray)
} else {
(finalBounds.toArray, 0, Array.empty[Int], Array.empty[Int], minMax)
(finalBounds.toArray, 0, Array.empty[Int], Array.empty[Int])
}
}

Expand Down

0 comments on commit 2da1899

Please sign in to comment.