Skip to content

Commit

Permalink
[CARBONDATA-2879] [CARBONDATA-2918] support sort scope for sdk and ma…
Browse files Browse the repository at this point in the history
…ke sort column case insensitive

problem:: [CARBONDATA-2879] SDK doesn't support batch sort and no sort as we don't have any API support for sort scope

Solution: provide sort_scope in existing load_options, so user can decide which sort_scope.
currently supported batch_sort, local_sort and no_sort.
global sort is not applicable in sdk scenario. Hence not supported.
Updated the method header and doc also

problem: [CARBONDATA-2918]
Currently in CarbonWriterBuilder,
sortColumnsList.indexOf(field.getFieldName()) return null due to case issue. so, one of the sortcolumns in array will be null. so one of the column schema becomes null,
hence when column schema is accessed, we get NPE

solution: Change pareant column (as sort column is supported only foro parent columns) name to lower case when schema received by the user
and change sort columns also to lower case when received by the user.

This closes #2692
  • Loading branch information
ajantha-bhat authored and ravipesala committed Sep 8, 2018
1 parent 6e50c1c commit f5c7a19
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 42 deletions.
26 changes: 14 additions & 12 deletions docs/sdk-guide.md
Expand Up @@ -339,6 +339,7 @@ public CarbonWriterBuilder taskNo(long taskNo);
* g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
* h. quotechar
* i. escapechar
* j. sort_scope -- "local_sort", "no_sort", "batch_sort"
*
* Default values are as follows.
*
Expand All @@ -351,6 +352,7 @@ public CarbonWriterBuilder taskNo(long taskNo);
* g. complex_delimiter_level_2 -- ":"
* h. quotechar -- "\""
* i. escapechar -- "\\"
* j. sort_scope -- "local_sort"
*
* @return updated CarbonWriterBuilder
*/
Expand All @@ -359,18 +361,18 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options);

```
/**
* To support the table properties for sdk writer
*
* @param options key,value pair of create table properties.
* supported keys values are
* a. blocksize -- [1-2048] values in MB. Default value is 1024
* b. blockletsize -- values in MB. Default value is 64 MB
* c. localDictionaryThreshold -- positive value, default is 10000
* d. enableLocalDictionary -- true / false. Default is false
* e. sortcolumns -- comma separated column. "c1,c2". Default all dimensions are sorted.
*
* @return updated CarbonWriterBuilder
*/
* To support the table properties for sdk writer
*
* @param options key,value pair of create table properties.
* supported keys values are
* a. table_blocksize -- [1-2048] values in MB. Default value is 1024
* b. table_blocklet_size -- values in MB. Default value is 64 MB
* c. local_dictionary_threshold -- positive value, default is 10000
* d. local_dictionary_enable -- true / false. Default is false
* e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted.
*
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withTableProperties(Map<String, String> options);
```

Expand Down
Expand Up @@ -126,7 +126,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sortColumns: List[String]): Any = {
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
.append(" {\"NaMe\":\"string\"},\n")
.append(" {\"age\":\"int\"},\n")
.append(" {\"height\":\"double\"}\n")
.append("]")
Expand Down Expand Up @@ -350,7 +350,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {

test(" test csv fileheader for transactional table") {
FileUtils.deleteDirectory(new File(writerPath))
buildTestDataWithSameUUID(3, false, null, List("name"))
buildTestDataWithSameUUID(3, false, null, List("Name"))
assert(new File(writerPath).exists())

sql("DROP TABLE IF EXISTS sdkOutputTable")
Expand Down Expand Up @@ -379,7 +379,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {

test("test count star with multiple loads files with same schema and UUID") {
FileUtils.deleteDirectory(new File(writerPath))
buildTestDataWithSameUUID(3, false, null, List("name"))
buildTestDataWithSameUUID(3, false, null, List("namE"))
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
Expand Down Expand Up @@ -2356,6 +2356,46 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), Row(Timestamp.valueOf("1970-01-02 16:00:00")))))
}

test("test Sort Scope with SDK") {
cleanTestData()
// test with no_sort
val options = Map("sort_scope" -> "no_sort").asJava
val fields: Array[Field] = new Array[Field](4)
fields(0) = new Field("stringField", DataTypes.STRING)
fields(1) = new Field("intField", DataTypes.INT)
val writer: CarbonWriter = CarbonWriter.builder
.outputPath(writerPath)
.withLoadOptions(options)
.buildWriterForCSVInput(new Schema(fields))
writer.write(Array("carbon", "1"))
writer.write(Array("hydrogen", "10"))
writer.write(Array("boron", "4"))
writer.write(Array("zirconium", "5"))
writer.close()

// read no sort data
sql("DROP TABLE IF EXISTS sdkTable")
sql(
s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION '$writerPath' """
.stripMargin)
checkAnswer(sql("select * from sdkTable"),
Seq(Row("carbon", 1), Row("hydrogen", 10), Row("boron", 4), Row("zirconium", 5)))

// write local sort data
val writer1: CarbonWriter = CarbonWriter.builder
.outputPath(writerPath)
.buildWriterForCSVInput(new Schema(fields))
writer1.write(Array("carbon", "1"))
writer1.write(Array("hydrogen", "10"))
writer1.write(Array("boron", "4"))
writer1.write(Array("zirconium", "5"))
writer1.close()
// read both-no sort and local sort data
checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(8)))
sql("DROP TABLE sdkTable")
cleanTestData()
}

test("test LocalDictionary with True") {
FileUtils.deleteDirectory(new File(writerPath))
val builder = CarbonWriter.builder.isTransactionalTable(false)
Expand All @@ -2379,10 +2419,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
test("test LocalDictionary with custom Threshold") {
FileUtils.deleteDirectory(new File(writerPath))
val tablePropertiesMap: util.Map[String, String] =
Map("blocksize" -> "12",
"sortcolumns" -> "name",
"localDictionaryThreshold" -> "200",
"enableLocalDictionary" -> "true").asJava
Map("table_blocksize" -> "12",
"sort_columns" -> "name",
"local_dictionary_threshold" -> "200",
"local_dictionary_enable" -> "true").asJava
val builder = CarbonWriter.builder.isTransactionalTable(false)
.withTableProperties(tablePropertiesMap)
.uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
Expand Down
Expand Up @@ -74,6 +74,8 @@ public CarbonLoadModel build(Map<String, String> options, long UUID, String tas
optionsFinal.put("fileheader", Strings.mkString(columns, ","));
}
optionsFinal.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options, table));
optionsFinal.put("sort_scope",
Maps.getOrDefault(options, "sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT));
CarbonLoadModel model = new CarbonLoadModel();
model.setCarbonTransactionalTable(table.isTransactionalTable());
model.setFactTimeStamp(UUID);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
Expand Down Expand Up @@ -59,7 +60,9 @@ private Object[] convertJsonToNoDictionaryToBytes(String jsonString)
if (jsonNodeMap == null) {
return null;
}
return jsonToCarbonRecord(jsonNodeMap, dataFields);
Map<String, Object> jsonNodeMapCaseInsensitive = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
jsonNodeMapCaseInsensitive.putAll(jsonNodeMap);
return jsonToCarbonRecord(jsonNodeMapCaseInsensitive, dataFields);
} catch (IOException e) {
throw new IOException("Failed to parse Json String: " + e.getMessage());
}
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonSessionInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.ThriftWriter;
Expand Down Expand Up @@ -96,6 +97,11 @@ public CarbonWriterBuilder outputPath(String path) {
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder sortBy(String[] sortColumns) {
if (sortColumns != null) {
for (int i = 0; i < sortColumns.length; i++) {
sortColumns[i] = sortColumns[i].toLowerCase();
}
}
this.sortColumns = sortColumns;
return this;
}
Expand Down Expand Up @@ -233,6 +239,7 @@ public CarbonWriterBuilder uniqueIdentifier(long UUID) {
* g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
* h. quotechar
* i. escapechar
* j. sort_scope -- "local_sort", "no_sort", "batch_sort"
*
* Default values are as follows.
*
Expand All @@ -245,17 +252,13 @@ public CarbonWriterBuilder uniqueIdentifier(long UUID) {
* g. complex_delimiter_level_2 -- ":"
* h. quotechar -- "\""
* i. escapechar -- "\\"
* j. sort_scope -- "local_sort"
*
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
Objects.requireNonNull(options, "Load options should not be null");
//validate the options.
if (options.size() > 9) {
throw new IllegalArgumentException("Supports only nine options now. "
+ "Refer method header or documentation");
}

for (String option: options.keySet()) {
if (!option.equalsIgnoreCase("bad_records_logger_enable") &&
!option.equalsIgnoreCase("bad_records_action") &&
Expand All @@ -265,9 +268,19 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
!option.equalsIgnoreCase("complex_delimiter_level_1") &&
!option.equalsIgnoreCase("complex_delimiter_level_2") &&
!option.equalsIgnoreCase("quotechar") &&
!option.equalsIgnoreCase("escapechar")) {
throw new IllegalArgumentException("Unsupported options. "
+ "Refer method header or documentation");
!option.equalsIgnoreCase("escapechar") &&
!option.equalsIgnoreCase("sort_scope")) {
throw new IllegalArgumentException("Unsupported option:" + option
+ ". Refer method header or documentation");
}
}
// validate sort scope
String sortScope = options.get("sort_scope");
if (sortScope != null) {
if ((!CarbonUtil.isValidSortOption(sortScope))) {
throw new IllegalArgumentException("Invalid Sort Scope Option: " + sortScope);
} else if (sortScope.equalsIgnoreCase("global_sort")) {
throw new IllegalArgumentException("global sort is not supported");
}
}

Expand All @@ -283,11 +296,11 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
*
* @param options key,value pair of create table properties.
* supported keys values are
* a. blocksize -- [1-2048] values in MB. Default value is 1024
* b. blockletsize -- values in MB. Default value is 64 MB
* c. localDictionaryThreshold -- positive value, default is 10000
* d. enableLocalDictionary -- true / false. Default is false
* e. sortcolumns -- comma separated column. "c1,c2". Default all dimensions are sorted.
* a. table_blocksize -- [1-2048] values in MB. Default value is 1024
* b. table_blocklet_size -- values in MB. Default value is 64 MB
* c. local_dictionary_threshold -- positive value, default is 10000
* d. local_dictionary_enable -- true / false. Default is false
* e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted.
*
* @return updated CarbonWriterBuilder
*/
Expand All @@ -300,8 +313,8 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options) {
}

Set<String> supportedOptions = new HashSet<>(Arrays
.asList("blocksize", "blockletsize", "localdictionarythreshold", "enablelocaldictionary",
"sortcolumns"));
.asList("table_blocksize", "table_blocklet_size", "local_dictionary_threshold",
"local_dictionary_enable", "sort_columns"));

for (String key : options.keySet()) {
if (!supportedOptions.contains(key.toLowerCase())) {
Expand All @@ -311,15 +324,15 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options) {
}

for (Map.Entry<String, String> entry : options.entrySet()) {
if (entry.getKey().equalsIgnoreCase("equalsIgnoreCase")) {
if (entry.getKey().equalsIgnoreCase("table_blocksize")) {
this.withBlockSize(Integer.parseInt(entry.getValue()));
} else if (entry.getKey().equalsIgnoreCase("blockletsize")) {
} else if (entry.getKey().equalsIgnoreCase("table_blocklet_size")) {
this.withBlockletSize(Integer.parseInt(entry.getValue()));
} else if (entry.getKey().equalsIgnoreCase("localDictionaryThreshold")) {
} else if (entry.getKey().equalsIgnoreCase("local_dictionary_threshold")) {
this.localDictionaryThreshold(Integer.parseInt(entry.getValue()));
} else if (entry.getKey().equalsIgnoreCase("enableLocalDictionary")) {
} else if (entry.getKey().equalsIgnoreCase("local_dictionary_enable")) {
this.enableLocalDictionary((entry.getValue().equalsIgnoreCase("true")));
} else {
} else if (entry.getKey().equalsIgnoreCase("sort_columns")) {
//sort columns
String[] sortColumns = entry.getValue().split(",");
this.sortBy(sortColumns);
Expand Down Expand Up @@ -534,14 +547,13 @@ private void setCsvHeader(CarbonLoadModel model) {

public CarbonLoadModel buildLoadModel(Schema carbonSchema)
throws IOException, InvalidLoadOptionException {
this.schema = carbonSchema;
this.schema = schemaFieldNameToLowerCase(carbonSchema);
// build CarbonTable using schema
CarbonTable table = buildCarbonTable();
if (persistSchemaFile) {
// we are still using the traditional carbon table folder structure
persistSchemaFile(table, CarbonTablePath.getSchemaFilePath(path));
}

// build LoadModel
return buildLoadModel(table, UUID, taskNo, options);
}
Expand Down Expand Up @@ -722,4 +734,19 @@ private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID, String task
setCsvHeader(build);
return build;
}

/* loop through all the parent column and change fields name lower case.
* this is to match with sort column case */
private Schema schemaFieldNameToLowerCase(Schema schema) {
if (schema == null) {
return null;
}
Field[] fields = schema.getFields();
for (int i = 0; i < fields.length; i++) {
if (fields[i] != null) {
fields[i].updateNameToLowerCase();
}
}
return new Schema(fields);
}
}
Expand Up @@ -213,4 +213,9 @@ public String getColumnComment() {
public void setColumnComment(String columnComment) {
this.columnComment = columnComment;
}

/*can use to change the case of the schema */
public void updateNameToLowerCase() {
this.name = name.toLowerCase();
}
}

0 comments on commit f5c7a19

Please sign in to comment.