diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index 46be8d8410b..5cf6163fc1e 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -165,4 +165,11 @@ public final class CarbonLoadOptionConstants { = "carbon.load.sortmemory.spill.percentage"; public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT = "0"; + + /** + * For Range_Column, it will use SCALE_FACTOR to control the size of each partition. + * When SCALE_FACTOR is about the compression ratio, each task will generate one CarbonData file. + * And the size of the file is about TABLE_BLOCKSIZE of this table. + */ + public static final int CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = 3; } diff --git a/integration/spark-common-test/src/test/resources/range_column/dataskew.csv b/integration/spark-common-test/src/test/resources/range_column/dataskew.csv new file mode 100644 index 00000000000..fb77a5de810 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/range_column/dataskew.csv @@ -0,0 +1,11 @@ +id,name,city,age +1,,wuhan,91 +2,,hangzhou,102 +3,,beijing,112 +4,,shenzhen,124 +5,e,shenzhen,65 +6,f,beijing,76 +7,g,hangzhou,37 +8,h,wuhan,48 +9,i,,89 +10,j,,50 \ No newline at end of file diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala new file mode 100644 index 00000000000..14f11e545f8 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.dataload + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore +import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.util.path.CarbonTablePath + +class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + var filePath: String = s"$resourcesPath/globalsort" + + override def beforeAll(): Unit = { + dropTable + } + + override def afterAll(): Unit = { + dropTable + } + + def dropTable(): Unit = { + sql("DROP TABLE IF EXISTS carbon_range_column1") + sql("DROP TABLE IF EXISTS carbon_range_column2") + sql("DROP TABLE IF EXISTS carbon_range_column3") + sql("DROP TABLE IF EXISTS carbon_range_column4") + } + + test("range_column with option GLOBAL_SORT_PARTITIONS") { + sql( + """ + | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city') + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " + + "OPTIONS('GLOBAL_SORT_PARTITIONS'='1', 'range_column'='name')") + + assert(getIndexFileCount("carbon_range_column1") === 1) + checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column1"), Seq(Row(12))) + checkAnswer(sql("SELECT * FROM carbon_range_column1"), + sql("SELECT * FROM carbon_range_column1 ORDER BY name")) + } + + test("range_column with option scale_factor") { + sql( + """ + | CREATE TABLE carbon_range_column2(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city') + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column2 " + + "OPTIONS('scale_factor'='10', 'range_column'='name')") + + assert(getIndexFileCount("carbon_range_column2") === 1) + checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column2"), Seq(Row(12))) + checkAnswer(sql("SELECT * FROM carbon_range_column2"), + sql("SELECT * FROM carbon_range_column2 ORDER BY name")) + } + + test("range_column only support single column ") { + sql( + """ + | CREATE TABLE carbon_range_column3(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city') + """.stripMargin) + + intercept[InvalidLoadOptionException] { + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column3 " + + "OPTIONS('scale_factor'='10', 'range_column'='name,id')") + } + } + + test("range_column with data skew") { + sql( + """ + | CREATE TABLE carbon_range_column4(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'SORT_COLUMNS'='name, city') + """.stripMargin) + + val dataSkewPath = s"$resourcesPath/range_column" + + sql(s"LOAD DATA LOCAL INPATH '$dataSkewPath' INTO TABLE carbon_range_column4 " + + "OPTIONS('GLOBAL_SORT_PARTITIONS'='5', 'range_column'='name', " + + "'BAD_RECORDS_ACTION'='force')") + + assert(getIndexFileCount("carbon_range_column4") === 5) + checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column4"), Seq(Row(10))) + } + + private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = { + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo) + if (FileFactory.isFileExist(segmentDir)) { + new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() + } else { + val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath) + new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles + .size() + } + } +} diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala index 55116456a79..f6292607e81 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala @@ -67,8 +67,10 @@ object CsvRDDHelper { val jobContext = new JobContextImpl(jobConf, null) val inputFormat = new CSVInputFormat() val rawSplits = inputFormat.getSplits(jobContext).toArray + var totalLength = 0L val splitFiles = rawSplits.map { split => val fileSplit = split.asInstanceOf[FileSplit] + totalLength = totalLength + fileSplit.getLength PartitionedFile( InternalRow.empty, fileSplit.getPath.toString, @@ -76,6 +78,7 @@ object CsvRDDHelper { fileSplit.getLength, fileSplit.getLocations) }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + model.setTotalSize(totalLength) val totalBytes = splitFiles.map(_.length + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 8ded6bd1147..ec1153a10e5 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -20,19 +20,24 @@ package org.apache.carbondata.spark.load import java.util.Comparator import org.apache.hadoop.conf.Configuration -import org.apache.spark.TaskContext +import org.apache.spark.{Accumulator, DataSkewRangePartitioner, RangePartitioner, TaskContext} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.execution.command.ExecutionErrors import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.storage.StorageLevel import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} +import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil -import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer +import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, DataField, DataLoadProcessBuilder, FailureCauses} +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.CarbonDataProcessorUtil @@ -139,6 +144,11 @@ object DataLoadProcessBuilderOnSpark { LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + updateLoadStatus(model, partialSuccessAccum) + } + + private def updateLoadStatus(model: CarbonLoadModel, partialSuccessAccum: Accumulator[Int] + ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { // Update status if (partialSuccessAccum.value != 0) { val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + @@ -156,4 +166,194 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + // initialize and prepare row counter + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") + val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") + val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + + // 1. Input + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) + val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark.internalInputFunc( + rows, index, modelBroadcast, Option(inputStepRowCounter), Option.empty) + } + + // 2. Convert + val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) + val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + + // 3. Range partition by range_column + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) + // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] + val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) + // range partition by key + val numPartitions = getNumPartitions(configuration, model, convertRDD) + val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) + import scala.reflect.classTag + val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) + val rangeRDD = keyRDD + .partitionBy( + new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + + // 4. Sort and Write data + sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, + writeStepRowCounter, conf.value.value)) + + // Log the number of rows in each step + LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) + LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) + LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) + LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + + // Update status + updateLoadStatus(model, partialSuccessAccum) + } + + /** + * provide RDD for sample + * CSVRecordReader(univocity parser) will output only one column + */ + private def getSampleRDD( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration, + configuration: CarbonDataLoadConfiguration, + modelBroadcast: Broadcast[CarbonLoadModel] + ): RDD[(Object, Object)] = { + // initialize and prepare row counter + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val header = configuration.getHeader + val rangeColumn = model.getRangePartitionColumn + val rangeColumnIndex = (0 until header.length).find{ + index => + header(index).equalsIgnoreCase(rangeColumn.getColName) + }.get + val rangeField = configuration + .getDataFields + .find(dataField => dataField.getColumn.getColName.equals(rangeColumn.getColName)) + .get + + // 1. Input + val newHadoopConf = new Configuration(hadoopConf) + newHadoopConf + .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex) + val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, newHadoopConf) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, Option.empty, Option(rangeField)) + } + + // 2. Convert + val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, hadoopConf) + val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark + .sampleConvertFunc(rows, rangeField, index, modelBroadcast) + } + .filter(_ != null) + + convertRDD.map(row => (row.getObject(0), null)) + } + + /** + * calculate the number of partitions. + */ + private def getNumPartitions( + configuration: CarbonDataLoadConfiguration, + model: CarbonLoadModel, + convertRDD: RDD[CarbonRow] + ): Int = { + var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)) + if (numPartitions <= 0) { + if (model.getTotalSize <= 0) { + numPartitions = convertRDD.partitions.length + } else { + // calculate the number of partitions + // better to generate a CarbonData file for each partition + val totalSize = model.getTotalSize.toDouble + val table = model.getCarbonDataLoadSchema.getCarbonTable + val blockSize = 1024L * 1024 * table.getBlockSizeInMB + val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB + val scaleFactor = if (model.getScaleFactor == 0) { + // here it assumes the compression ratio of CarbonData is about 30%, + // so it multiply by 3 to get the split size of CSV files. + CarbonLoadOptionConstants.CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT + } else { + model.getScaleFactor + } + // For Range_Column, it will try to generate one big file for each partition. + // And the size of the big file is about TABLE_BLOCKSIZE of this table. + val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * scaleFactor + numPartitions = Math.ceil(totalSize / splitSize).toInt + } + } + numPartitions + } + + private def indexOfColumn(column: CarbonColumn, fields: Array[DataField]): Int = { + (0 until fields.length) + .find(index => fields(index).getColumn.getColName.equals(column.getColName)) + .get + } + + private def createOrderingForColumn(column: CarbonColumn): Ordering[Object] = { + if (column.isDimension) { + val dimension = column.asInstanceOf[CarbonDimension] + if (dimension.isGlobalDictionaryEncoding || dimension.isDirectDictionaryEncoding) { + new PrimtiveOrdering(DataTypes.INT) + } else { + if (DataTypeUtil.isPrimitiveColumn(column.getDataType)) { + new PrimtiveOrdering(column.getDataType) + } else { + new ByteArrayOrdering() + } + } + } else { + new PrimtiveOrdering(column.getDataType) + } + } +} + +class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] { + val comparator = org.apache.carbondata.core.util.comparator.Comparator + .getComparator(dataType) + + override def compare(x: Object, y: Object): Int = { + comparator.compare(x, y) + } +} + +class ByteArrayOrdering() extends Ordering[Object] { + override def compare(x: Object, y: Object): Int = { + UnsafeComparer.INSTANCE.compareTo(x.asInstanceOf[Array[Byte]], y.asInstanceOf[Array[Byte]]) + } } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 2ca47b39497..ff0e1bf6080 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -17,9 +17,11 @@ package org.apache.carbondata.spark.load +import java.util + import com.univocity.parsers.common.TextParsingException import org.apache.hadoop.conf.Configuration -import org.apache.spark.{Accumulator, SparkEnv, TaskContext} +import org.apache.spark.{Accumulator, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow @@ -30,13 +32,15 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} -import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations} +import org.apache.carbondata.processing.loading._ import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl +import org.apache.carbondata.processing.loading.parser.RowParser +import org.apache.carbondata.processing.loading.parser.impl.{RangeColumnParserImpl, RowParserImpl} +import org.apache.carbondata.processing.loading.row.CarbonRowBatch import org.apache.carbondata.processing.loading.sort.SortStepRowHandler -import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl +import org.apache.carbondata.processing.loading.steps.{DataWriterProcessorStepImpl, SortProcessorStepImpl} import org.apache.carbondata.processing.sort.sortdata.SortParameters import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil} @@ -79,8 +83,6 @@ object DataLoadProcessorStepOnSpark { new Iterator[CarbonRow] { override def hasNext: Boolean = rows.hasNext - - override def next(): CarbonRow = { var row : CarbonRow = null if(isRawDataRequired) { @@ -95,6 +97,44 @@ object DataLoadProcessorStepOnSpark { } } + def internalInputFunc( + rows: Iterator[InternalRow], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowCounter: Option[Accumulator[Int]], + rangeField: Option[DataField]): Iterator[CarbonRow] = { + val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val rowParser: RowParser = if (rangeField.isEmpty) { + new RowParserImpl(conf.getDataFields, conf) + } else { + new RangeColumnParserImpl(rangeField.get, conf) + } + val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf) + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + var row: CarbonRow = null + val rawRow = + rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]] + if (isRawDataRequired) { + row = new CarbonRow(rowParser.parseRow(rawRow), rawRow) + } else { + row = new CarbonRow(rowParser.parseRow(rawRow)) + } + if (rowCounter.isDefined) { + rowCounter.get.add(1) + } + row + } + } + } + def inputAndconvertFunc( rows: Iterator[Array[AnyRef]], index: Int, @@ -179,9 +219,33 @@ object DataLoadProcessorStepOnSpark { override def hasNext: Boolean = rows.hasNext override def next(): CarbonRow = { - val row = rowConverter.convert(rows.next()) rowCounter.add(1) - row + rowConverter.convert(rows.next()) + } + } + } + + def sampleConvertFunc( + rows: Iterator[CarbonRow], + rangeField: DataField, + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel] + ): Iterator[CarbonRow] = { + val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf) + val rowConverter = new RowConverterImpl(Array(rangeField), conf, badRecordLogger) + rowConverter.initialize() + + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + rowConverter.convert(rows.next()) } } } @@ -305,4 +369,107 @@ object DataLoadProcessorStepOnSpark { e) } } + + def sortAndWriteFunc( + rows: Iterator[CarbonRow], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowCounter: Accumulator[Int], + conf: Configuration) { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, + conf.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)) + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf) + var model: CarbonLoadModel = null + var tableName: String = null + var inputProcessor: NewInputProcessorStepImpl = null + var rowConverter: RowConverterImpl = null + var sortProcessor: SortProcessorStepImpl = null + var dataWriter: DataWriterProcessorStepImpl = null + try { + model = modelBroadcast.value.getCopyWithTaskNo(index.toString) + val storeLocation = CommonUtil.getTempStoreLocations(index.toString) + val conf = DataLoadProcessBuilder.createConfiguration(model, storeLocation) + tableName = model.getTableName + + rowConverter = new RowConverterImpl(conf.getDataFields, conf, null) + rowConverter.initialize() + conf.setCardinalityFinder(rowConverter) + + inputProcessor = new NewInputProcessorStepImpl(conf, rows) + sortProcessor = new SortProcessorStepImpl(conf, inputProcessor) + dataWriter = new DataWriterProcessorStepImpl(conf, sortProcessor) + dataWriter.initialize() + dataWriter.execute() + } catch { + case e: CarbonDataWriterException => + LOGGER.error("Failed for table: " + tableName + " in Data Writer Step", e) + throw new CarbonDataLoadingException("Error while initializing data handler : " + + e.getMessage) + case e: Exception => + LOGGER.error("Failed for table: " + tableName + " in Data Writer Step", e) + throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage, e) + } finally { + if (rowConverter != null) { + rowConverter.finish() + } + // close the dataWriter once the write in done success or fail. if not closed then thread to + // to prints the rows processed in each step for every 10 seconds will never exit. + if(null != dataWriter) { + dataWriter.close() + } + // clean up the folders and files created locally for data load operation + TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false) + } + } +} + +class NewInputProcessorStepImpl(configuration: CarbonDataLoadConfiguration, + rows: Iterator[CarbonRow]) extends AbstractDataLoadProcessorStep(configuration, null) { + /** + * Tranform the data as per the implementation. + * + * @return Array of Iterator with data. It can be processed parallel if implementation class wants + * @throws CarbonDataLoadingException + */ + override def execute(): Array[util.Iterator[CarbonRowBatch]] = { + val batchSize = CarbonProperties.getInstance.getBatchSize + val iteratorArray = new Array[util.Iterator[CarbonRowBatch]](1) + + iteratorArray(0) = new util.Iterator[CarbonRowBatch] { + + val rowBatch = new CarbonRowBatch(batchSize) { + var count = 0 + override def next(): CarbonRow = { + count = count + 1 + rows.next() + } + override def hasNext: Boolean = rows.hasNext && count < batchSize + + def reset(): Unit = { + count = 0 + } + } + + override def next(): CarbonRowBatch = { + rowBatch.reset() + rowBatch + } + + override def hasNext: Boolean = { + rows.hasNext + } + } + + iteratorArray + } + + /** + * Get the step name for logging purpose. + * + * @return Step name + */ + override protected def getStepName: String = { + "Input Processor for RANGE_SORT" + } } diff --git a/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala new file mode 100644 index 00000000000..07ad0117848 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag +import scala.util.hashing.byteswap32 + +import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.{CollectionsUtils, Utils} + +/** + * support data skew scenario + * copy from spark: RangePartitioner + * + * RangePartitioner: + * the rangeBounds are the distinct values, each rangeBound has a partition. + * so for the data skew scenario, some partitions will include more data. + * + * DataSkewRangePartitioner: + * the rangeBounds are also the distinct values, but it calculates the skew weight. + * So some rangeBounds maybe have more than one partitions. + * + * for example, split following CSV file to 5 partitions: + * --------------- + * col1,col2 + * 1, + * 2, + * 3, + * 4, + * 5,e + * 6,f + * 7,g + * 8,h + * 9,i + * 10,j + * --------------- + * RangePartitioner will give the following rangeBounds. + * ------------------------------------------------- + * | range bound| partition range| number of values| + * ------------------------------------------------- + * | null | <= null | 4 | + * | e | (null, e] | 1 | + * | f | (e, f] | 1 | + * | h | (f, h] | 2 | + * | | > h | 2 | + * ------------------------------------------------- + * + * DataSkewRangePartitioner will give the following rangeBounds. + * -------------------------------------------------------------- + * | range bound| skew weight| partition range| number of values| + * -------------------------------------------------------------- + * | null | 2 | <= null | 2(4/2) | + * | f | | (null, f] | 2 | + * | h | | (f, h] | 2 | + * | | | > h | 2 | + * | | | <= null | 2(4/2) | + * -------------------------------------------------------------- + * The skew weight of range bound "null" is 2. + * So it will start two tasks for range bound "null" to create two partitions. + */ +class DataSkewRangePartitioner[K: Ordering : ClassTag, V]( + partitions: Int, + rdd: RDD[_ <: Product2[K, V]]) + extends Partitioner { + + // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. + require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.") + + private var ordering = implicitly[Ordering[K]] + + // An array of upper bounds for the first (partitions - 1) partitions + // dataSkewCount: how many bounds happened data skew + // dataSkewIndex: the index of data skew bounds + // dataSkewNum: how many partition of each data skew bound + private var (rangeBounds: Array[K], skewCount: Int, skewIndexes: Array[Int], + skewWeights: Array[Int]) = { + if (partitions <= 1) { + (Array.empty[K], 0, Array.empty[Int], Array.empty[Int]) + } else { + // This is the sample size we need to have roughly balanced output partitions, capped at 1M. + val sampleSize = math.min(20.0 * partitions, 1e6) + // Assume the input partitions are roughly balanced and over-sample a little bit. + val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt + val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) + if (numItems == 0L) { + (Array.empty[K], 0, Array.empty[Int], Array.empty[Int]) + } else { + // If a partition contains much more than the average number of items, we re-sample from it + // to ensure that enough items are collected from that partition. + val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) + val candidates = ArrayBuffer.empty[(K, Float)] + val imbalancedPartitions = mutable.Set.empty[Int] + sketched.foreach { case (idx, n, sample) => + if (fraction * n > sampleSizePerPartition) { + imbalancedPartitions += idx + } else { + // The weight is 1 over the sampling probability. + val weight = (n.toDouble / sample.length).toFloat + for (key <- sample) { + candidates += ((key, weight)) + } + } + } + if (imbalancedPartitions.nonEmpty) { + // Re-sample imbalanced partitions with the desired sampling probability. + val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) + val seed = byteswap32(-rdd.id - 1) + val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() + val weight = (1.0 / fraction).toFloat + candidates ++= reSampled.map(x => (x, weight)) + } + determineBounds(candidates, partitions) + } + } + } + + def determineBounds( + candidates: ArrayBuffer[(K, Float)], + partitions: Int): (Array[K], Int, Array[Int], Array[Int]) = { + val ordered = candidates.sortBy(_._1) + val numCandidates = ordered.size + val sumWeights = ordered.map(_._2.toDouble).sum + val step = sumWeights / partitions + var cumWeight = 0.0 + var target = step + val bounds = ArrayBuffer.empty[K] + var i = 0 + var j = 0 + var previousBound = Option.empty[K] + while ((i < numCandidates) && (j < partitions - 1)) { + val (key, weight) = ordered(i) + cumWeight += weight + if (cumWeight >= target) { + // Skip duplicate values. + if (previousBound.isEmpty || ordering.gteq(key, previousBound.get)) { + bounds += key + target += step + j += 1 + previousBound = Some(key) + } + } + i += 1 + } + + if (bounds.size >= 2) { + combineDataSkew(bounds) + } else { + (bounds.toArray, 0, Array.empty[Int], Array.empty[Int]) + } + } + + def combineDataSkew(bounds: ArrayBuffer[K]): (Array[K], Int, Array[Int], Array[Int]) = { + val finalBounds = ArrayBuffer.empty[K] + var preBound = bounds(0) + finalBounds += preBound + val dataSkewIndexTmp = ArrayBuffer.empty[Int] + val dataSkewNumTmp = ArrayBuffer.empty[Int] + var dataSkewCountTmp = 1 + (1 until bounds.size).foreach { index => + val bound = bounds(index) + if (ordering.equiv(bound, preBound)) { + if (dataSkewCountTmp == 1) { + dataSkewIndexTmp += (finalBounds.size - 1) + } + dataSkewCountTmp += 1 + } else { + finalBounds += bound + preBound = bound + if (dataSkewCountTmp > 1) { + dataSkewNumTmp += dataSkewCountTmp + dataSkewCountTmp = 1 + } + } + } + if (dataSkewIndexTmp.size > 0) { + (finalBounds.toArray, dataSkewIndexTmp.size, dataSkewIndexTmp.toArray, dataSkewNumTmp.toArray) + } else { + (finalBounds.toArray, 0, Array.empty[Int], Array.empty[Int]) + } + } + + private val skewPartitions: Int = if (skewCount == 0) { + 0 + } else { + skewWeights.map(_ - 1).sum + } + + def numPartitions: Int = rangeBounds.length + 1 + skewPartitions + + private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] + private var skewIndexesLen = 0 + private var dsps: Array[DataSkewPartitioner] = null + + def initialize(): Unit = { + if (skewCount > 0) { + skewIndexesLen = skewIndexes.length + dsps = new Array[DataSkewPartitioner](skewIndexesLen) + var previousPart = rangeBounds.length + for (i <- 0 until skewIndexesLen) { + dsps(i) = new DataSkewPartitioner(skewIndexes(i), previousPart, skewWeights(i)) + previousPart = previousPart + skewWeights(i) - 1 + } + } + } + + private var needInit = true + + def getPartition(key: Any): Int = { + if (needInit) { + needInit = false + initialize() + } + val k = key.asInstanceOf[K] + var partition = 0 + if (rangeBounds.length <= 128) { + // If we have less than 128 partitions naive search + while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { + partition += 1 + } + } else { + // Determine which binary search method to use only once. + partition = binarySearch(rangeBounds, k) + // binarySearch either returns the match location or -[insertion point]-1 + if (partition < 0) { + partition = -partition - 1 + } + if (partition > rangeBounds.length) { + partition = rangeBounds.length + } + } + if (skewCount == 0) { + partition + } else { + getDataSkewPartition(partition) + } + } + + def getDataSkewPartition(partition: Int): Int = { + var index = -1 + if (partition <= skewIndexes(skewIndexesLen - 1) && partition >= skewIndexes(0)) { + for (i <- 0 until skewIndexesLen) { + if (skewIndexes(i) == partition) { + index = i + } + } + } + if (index == -1) { + partition + } else { + nextPartition(index) + } + } + + def nextPartition(index: Int): Int = { + dsps(index).nextPartition() + } + + override def equals(other: Any): Boolean = { + other match { + case r: DataSkewRangePartitioner[_, _] => + r.rangeBounds.sameElements(rangeBounds) + case _ => + false + } + } + + override def hashCode(): Int = { + val prime = 31 + var result = 1 + var i = 0 + while (i < rangeBounds.length) { + result = prime * result + rangeBounds(i).hashCode + i += 1 + } + result = prime * result + result + } + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream): Unit = { + Utils.tryOrIOException { + val sfactory = SparkEnv.get.serializer + sfactory match { + case js: JavaSerializer => out.defaultWriteObject() + case _ => + out.writeInt(skewCount) + if (skewCount > 0) { + out.writeObject(skewIndexes) + out.writeObject(skewWeights) + } + out.writeObject(ordering) + out.writeObject(binarySearch) + + val ser = sfactory.newInstance() + Utils.serializeViaNestedStream(out, ser) { stream => + stream.writeObject(scala.reflect.classTag[Array[K]]) + stream.writeObject(rangeBounds) + } + } + } + } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream): Unit = { + Utils.tryOrIOException { + needInit = true + val sfactory = SparkEnv.get.serializer + sfactory match { + case js: JavaSerializer => in.defaultReadObject() + case _ => + skewCount = in.readInt() + if (skewCount > 0) { + skewIndexes = in.readObject().asInstanceOf[Array[Int]] + skewWeights = in.readObject().asInstanceOf[Array[Int]] + } + ordering = in.readObject().asInstanceOf[Ordering[K]] + binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int] + + val ser = sfactory.newInstance() + Utils.deserializeViaNestedStream(in, ser) { ds => + implicit val classTag = ds.readObject[ClassTag[Array[K]]]() + rangeBounds = ds.readObject[Array[K]]() + } + } + } + } +} + +class DataSkewPartitioner(originPart: Int, previousPart: Int, skewWeight: Int) { + var index = 0 + + def nextPartition(): Int = { + if (index == 0) { + index = index + 1 + originPart + } else { + val newPartition = previousPart + index + index = index + 1 + if (index == skewWeight) { + index = 0 + } + newPartition + } + } +} diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 9c7d28b06af..4b148798b9f 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -1103,7 +1103,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { "TIMESTAMPFORMAT", "SKIP_EMPTY_LINE", "SORT_COLUMN_BOUNDS", - "LOAD_MIN_SIZE_INMB" + "LOAD_MIN_SIZE_INMB", + "RANGE_COLUMN", + "SCALE_FACTOR" ) var isSupported = true val invalidOptions = StringBuilder.newBuilder diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 92d1791e23a..02acde695aa 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -351,6 +351,12 @@ object CarbonDataRDDFactory { } else { status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) + } else if (dataFrame.isEmpty && isSortTable && + carbonLoadModel.getRangePartitionColumn != null && + (sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) || + sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) { + DataLoadProcessBuilderOnSpark + .loadDataUsingRangeSort(sqlContext.sparkSession, carbonLoadModel, hadoopConf) } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, dataFrame, carbonLoadModel, hadoopConf) diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java index f01aea80177..d3334900319 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java @@ -30,6 +30,7 @@ import com.univocity.parsers.csv.CsvParserSettings; import org.apache.commons.io.input.BOMInputStream; import org.apache.commons.lang.BooleanUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -74,6 +75,10 @@ public class CSVInputFormat extends FileInputFormat optionsFinal, + CarbonLoadModel carbonLoadModel) throws InvalidLoadOptionException { + String rangeColumn = optionsFinal.get("range_column"); + if (rangeColumn != null) { + carbonLoadModel + .setRangePartitionColumn(table.getColumnByName(table.getTableName(), rangeColumn)); + if (carbonLoadModel.getRangePartitionColumn() == null) { + throw new InvalidLoadOptionException("Invalid range_column option"); + } + } + + String scaleFactor = optionsFinal.get("scale_factor"); + if (scaleFactor != null) { + try { + int scale = Integer.parseInt(scaleFactor); + if (scale < 1 || scale > 300) { + throw new InvalidLoadOptionException( + "Invalid scale_factor option, the range of scale_factor should be [1, 300]"); + } + carbonLoadModel.setScaleFactor(scale); + } catch (NumberFormatException ex) { + throw new InvalidLoadOptionException( + "Invalid scale_factor option, scale_factor should be a integer"); + } + } } private int validateMaxColumns(String[] csvHeaders, String maxColumns) diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java index 78049a458c0..419824b3980 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java @@ -193,6 +193,9 @@ public static Map fillOptionWithDefaultValue( optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, Maps.getOrDefault(options, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)); + + optionsFinal.put("range_column", Maps.getOrDefault(options, "range_column", null)); + optionsFinal.put("scale_factor", Maps.getOrDefault(options, "scale_factor", null)); return optionsFinal; } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RangeColumnParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RangeColumnParserImpl.java new file mode 100644 index 00000000000..ab91ca6188e --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RangeColumnParserImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.parser.impl; + +import java.util.ArrayList; + +import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.loading.DataField; +import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.loading.parser.CarbonParserFactory; +import org.apache.carbondata.processing.loading.parser.GenericParser; +import org.apache.carbondata.processing.loading.parser.RowParser; + +public class RangeColumnParserImpl implements RowParser { + + private GenericParser genericParser; + + public RangeColumnParserImpl(DataField rangeField, CarbonDataLoadConfiguration configuration) { + String[] complexDelimiters = + (String[]) configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS); + ArrayList complexDelimiterList = new ArrayList<>(complexDelimiters.length); + for (int index = 0; index < complexDelimiters.length; index++) { + complexDelimiterList.add(complexDelimiters[index]); + } + String nullFormat = + configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT) + .toString(); + genericParser = + CarbonParserFactory.createParser(rangeField.getColumn(), complexDelimiterList, nullFormat); + } + + @Override + public Object[] parseRow(Object[] row) { + if (row == null || row.length < 1) { + return new String[1]; + } + Object[] result = new Object[1]; + result[0] = genericParser.parse(row[0]); + return result; + } + +}