Skip to content

Commit

Permalink
[CARBONDATA-3219] Support range partition the input data for local_so…
Browse files Browse the repository at this point in the history
…rt/global sort data loading

For global_sort/local_sort table, load data command add RANGE_COLUMN option

load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>')
when we know the total size of input data, we can calculate the number of the partitions.
load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>', 'global_sort_partitions'='10')
when we don't know the total size of the input data, we can give the size of each partition.
load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>', 'scale_factor'='10')
it will calcute the number of the partitions as follows.

splitSize =  Math.max(blocklet_size, (block_size - blocklet_size)) * scale_factor
numPartitions = Math.ceil(total size / splitSize)
Limitation:

not support insert into, support only load data command,
not support multiple range columns, support only one range column
exists data skew

This closes #2971
  • Loading branch information
QiangCai authored and ravipesala committed Jan 21, 2019
1 parent 310c67d commit 8f81507
Show file tree
Hide file tree
Showing 14 changed files with 1,040 additions and 14 deletions.
Expand Up @@ -165,4 +165,11 @@ public final class CarbonLoadOptionConstants {
= "carbon.load.sortmemory.spill.percentage";

public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT = "0";

/**
* For Range_Column, it will use SCALE_FACTOR to control the size of each partition.
* When SCALE_FACTOR is about the compression ratio, each task will generate one CarbonData file.
* And the size of the file is about TABLE_BLOCKSIZE of this table.
*/
public static final int CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = 3;
}
@@ -0,0 +1,11 @@
id,name,city,age
1,,wuhan,91
2,,hangzhou,102
3,,beijing,112
4,,shenzhen,124
5,e,shenzhen,65
6,f,beijing,76
7,g,hangzhou,37
8,h,wuhan,48
9,i,,89
10,j,,50
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.spark.testsuite.dataload

import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.core.util.path.CarbonTablePath

class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
var filePath: String = s"$resourcesPath/globalsort"

override def beforeAll(): Unit = {
dropTable
}

override def afterAll(): Unit = {
dropTable
}

def dropTable(): Unit = {
sql("DROP TABLE IF EXISTS carbon_range_column1")
sql("DROP TABLE IF EXISTS carbon_range_column2")
sql("DROP TABLE IF EXISTS carbon_range_column3")
sql("DROP TABLE IF EXISTS carbon_range_column4")
}

test("range_column with option GLOBAL_SORT_PARTITIONS") {
sql(
"""
| CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
| STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city')
""".stripMargin)

sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
"OPTIONS('GLOBAL_SORT_PARTITIONS'='1', 'range_column'='name')")

assert(getIndexFileCount("carbon_range_column1") === 1)
checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column1"), Seq(Row(12)))
checkAnswer(sql("SELECT * FROM carbon_range_column1"),
sql("SELECT * FROM carbon_range_column1 ORDER BY name"))
}

test("range_column with option scale_factor") {
sql(
"""
| CREATE TABLE carbon_range_column2(id INT, name STRING, city STRING, age INT)
| STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city')
""".stripMargin)

sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column2 " +
"OPTIONS('scale_factor'='10', 'range_column'='name')")

assert(getIndexFileCount("carbon_range_column2") === 1)
checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column2"), Seq(Row(12)))
checkAnswer(sql("SELECT * FROM carbon_range_column2"),
sql("SELECT * FROM carbon_range_column2 ORDER BY name"))
}

test("range_column only support single column ") {
sql(
"""
| CREATE TABLE carbon_range_column3(id INT, name STRING, city STRING, age INT)
| STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city')
""".stripMargin)

intercept[InvalidLoadOptionException] {
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column3 " +
"OPTIONS('scale_factor'='10', 'range_column'='name,id')")
}
}

test("range_column with data skew") {
sql(
"""
| CREATE TABLE carbon_range_column4(id INT, name STRING, city STRING, age INT)
| STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'SORT_COLUMNS'='name, city')
""".stripMargin)

val dataSkewPath = s"$resourcesPath/range_column"

sql(s"LOAD DATA LOCAL INPATH '$dataSkewPath' INTO TABLE carbon_range_column4 " +
"OPTIONS('GLOBAL_SORT_PARTITIONS'='5', 'range_column'='name', " +
"'BAD_RECORDS_ACTION'='force')")

assert(getIndexFileCount("carbon_range_column4") === 5)
checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column4"), Seq(Row(10)))
}

private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
if (FileFactory.isFileExist(segmentDir)) {
new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
} else {
val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles
.size()
}
}
}
Expand Up @@ -67,15 +67,18 @@ object CsvRDDHelper {
val jobContext = new JobContextImpl(jobConf, null)
val inputFormat = new CSVInputFormat()
val rawSplits = inputFormat.getSplits(jobContext).toArray
var totalLength = 0L
val splitFiles = rawSplits.map { split =>
val fileSplit = split.asInstanceOf[FileSplit]
totalLength = totalLength + fileSplit.getLength
PartitionedFile(
InternalRow.empty,
fileSplit.getPath.toString,
fileSplit.getStart,
fileSplit.getLength,
fileSplit.getLocations)
}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
model.setTotalSize(totalLength)
val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism

Expand Down

0 comments on commit 8f81507

Please sign in to comment.