Skip to content

Commit

Permalink
[CARBONDATA-2922] support long string columns with spark FileFormat a…
Browse files Browse the repository at this point in the history
…nd SDK with long_string_columns TableProperties

problem: Exception if we try long string column for Spark file format

Solution: For Varchar data type respective spark data type was selected, hence the exception.
Spark don't have limit for string data type. so map varchar to string type.
Tested with more than 32K length string and added a UT

Also for SDK (required for AVRO) and spark file format, supported the longstring columns table property.

This closes #2690
  • Loading branch information
ajantha-bhat authored and ravipesala committed Sep 10, 2018
1 parent 54dcd8d commit a9cc434
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 26 deletions.
7 changes: 4 additions & 3 deletions docs/sdk-guide.md
Expand Up @@ -339,8 +339,7 @@ public CarbonWriterBuilder taskNo(long taskNo);
* g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
* h. quotechar
* i. escapechar
* j. sort_scope -- "local_sort", "no_sort", "batch_sort"
*
*
* Default values are as follows.
*
* a. bad_records_logger_enable -- "false"
Expand All @@ -352,7 +351,6 @@ public CarbonWriterBuilder taskNo(long taskNo);
* g. complex_delimiter_level_2 -- ":"
* h. quotechar -- "\""
* i. escapechar -- "\\"
* j. sort_scope -- "local_sort"
*
* @return updated CarbonWriterBuilder
*/
Expand All @@ -370,6 +368,9 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options);
* c. local_dictionary_threshold -- positive value, default is 10000
* d. local_dictionary_enable -- true / false. Default is false
* e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted.
* j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort"
* k. long_string_columns -- comma separated string columns which are more than 32k length.
* default value is null.
*
* @return updated CarbonWriterBuilder
*/
Expand Down
Expand Up @@ -2365,7 +2365,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
fields(1) = new Field("intField", DataTypes.INT)
val writer: CarbonWriter = CarbonWriter.builder
.outputPath(writerPath)
.withLoadOptions(options)
.withTableProperties(options)
.buildWriterForCSVInput(new Schema(fields))
writer.write(Array("carbon", "1"))
writer.write(Array("hydrogen", "10"))
Expand Down
Expand Up @@ -228,6 +228,12 @@ object CarbonSparkDataSourceUtil {
case _ => null
}
builder.sortBy(sortCols)
val longStringColumns: String = options
.getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, null)
if (longStringColumns != null) {
val loadOptions = Map(CarbonCommonConstants.LONG_STRING_COLUMNS -> longStringColumns).asJava
builder.withTableProperties(loadOptions)
}
builder.uniqueIdentifier(System.currentTimeMillis())
val model = builder.buildLoadModel(schema)
val tableInfo = model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
Expand Down
Expand Up @@ -83,6 +83,7 @@ private[spark] object SparkTypeConverter {
case CarbonDataTypes.BOOLEAN => BooleanType
case CarbonDataTypes.TIMESTAMP => TimestampType
case CarbonDataTypes.DATE => DateType
case CarbonDataTypes.VARCHAR => StringType
}
}
}
Expand Down
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.carbondata.datasource
import java.io.File

import org.apache.commons.io.FileUtils
import org.apache.commons.lang.RandomStringUtils
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.sql.carbondata.datasource.TestUtil._
import org.apache.spark.util.SparkUtil
Expand Down Expand Up @@ -319,4 +320,70 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
assert(new File(filePath).exists())
cleanTestData()
}

test("Test with long string columns") {
FileUtils.deleteDirectory(new File(writerPath))
// here we specify the long string column as varchar
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
.append(" {\"address\":\"varchar\"},\n")
.append(" {\"age\":\"int\"}\n")
.append("]")
.toString()
val builder = CarbonWriter.builder()
val writer = builder.outputPath(writerPath)
.buildWriterForCSVInput(Schema.parseJson(schema))
for (i <- 0 until 3) {
// write a varchar with 75,000 length
writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString))
}
writer.close()

//--------------- data source external table with schema ---------------------------
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
if (spark.sparkContext.version.startsWith("2.1")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable (name string, address string, age int)
|USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """
.stripMargin)
} else if (spark.sparkContext.version.startsWith("2.2")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon
|OPTIONS("long_String_columns"="address") LOCATION
|'$writerPath' """.stripMargin)
} else {
// TODO. spark2.3 ?
assert(false)
}
assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1)
val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList()
assert(op.get(0).getString(0).length == 75000)
spark.sql("DROP TABLE sdkOutputTable")

//--------------- data source external table without schema ---------------------------
spark.sql("DROP TABLE IF EXISTS sdkOutputTableWithoutSchema")
if (spark.sparkContext.version.startsWith("2.1")) {
//data source file format
spark
.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
|'$writerPath', "long_String_columns" "address") """.stripMargin)
} else if (spark.sparkContext.version.startsWith("2.2")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
|("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin)
} else {
// TODO. spark2.3 ?
assert(false)
}
assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1)
val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList()
assert(op1.get(0).getString(0).length == 75000)
spark.sql("DROP TABLE sdkOutputTableWithoutSchema")
cleanTestData()
}
}
Expand Up @@ -240,7 +240,6 @@ public CarbonWriterBuilder uniqueIdentifier(long UUID) {
* g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
* h. quotechar
* i. escapechar
* j. sort_scope -- "local_sort", "no_sort", "batch_sort"
*
* Default values are as follows.
*
Expand All @@ -253,7 +252,6 @@ public CarbonWriterBuilder uniqueIdentifier(long UUID) {
* g. complex_delimiter_level_2 -- ":"
* h. quotechar -- "\""
* i. escapechar -- "\\"
* j. sort_scope -- "local_sort"
*
* @return updated CarbonWriterBuilder
*/
Expand All @@ -269,26 +267,17 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
!option.equalsIgnoreCase("complex_delimiter_level_1") &&
!option.equalsIgnoreCase("complex_delimiter_level_2") &&
!option.equalsIgnoreCase("quotechar") &&
!option.equalsIgnoreCase("escapechar") &&
!option.equalsIgnoreCase("sort_scope")) {
!option.equalsIgnoreCase("escapechar")) {
throw new IllegalArgumentException("Unsupported option:" + option
+ ". Refer method header or documentation");
}
}
// validate sort scope
String sortScope = options.get("sort_scope");
if (sortScope != null) {
if ((!CarbonUtil.isValidSortOption(sortScope))) {
throw new IllegalArgumentException("Invalid Sort Scope Option: " + sortScope);
} else if (sortScope.equalsIgnoreCase("global_sort")) {
throw new IllegalArgumentException("global sort is not supported");
}
}

// convert it to treeMap as keys need to be case insensitive
Map<String, String> optionsTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
optionsTreeMap.putAll(options);
this.options = optionsTreeMap;
if (this.options == null) {
// convert it to treeMap as keys need to be case insensitive
this.options = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
}
this.options.putAll(options);
return this;
}

Expand All @@ -302,6 +291,9 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
* c. local_dictionary_threshold -- positive value, default is 10000
* d. local_dictionary_enable -- true / false. Default is false
* e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted.
* j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort"
* k. long_string_columns -- comma separated string columns which are more than 32k length.
* default value is null.
*
* @return updated CarbonWriterBuilder
*/
Expand All @@ -315,7 +307,7 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options) {

Set<String> supportedOptions = new HashSet<>(Arrays
.asList("table_blocksize", "table_blocklet_size", "local_dictionary_threshold",
"local_dictionary_enable", "sort_columns"));
"local_dictionary_enable", "sort_columns", "sort_scope", "long_string_columns"));

for (String key : options.keySet()) {
if (!supportedOptions.contains(key.toLowerCase())) {
Expand All @@ -337,6 +329,10 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options) {
//sort columns
String[] sortColumns = entry.getValue().split(",");
this.sortBy(sortColumns);
} else if (entry.getKey().equalsIgnoreCase("sort_scope")) {
this.withSortScope(entry);
} else if (entry.getKey().equalsIgnoreCase("long_string_columns")) {
updateToLoadOptions(entry);
}
}
return this;
Expand Down Expand Up @@ -548,7 +544,13 @@ private void setCsvHeader(CarbonLoadModel model) {

public CarbonLoadModel buildLoadModel(Schema carbonSchema)
throws IOException, InvalidLoadOptionException {
this.schema = schemaFieldNameToLowerCase(carbonSchema);
Set<String> longStringColumns = null;
if (options != null && options.get("long_string_columns") != null) {
longStringColumns =
new HashSet<>(Arrays.asList(options.get("long_string_columns").toLowerCase().split(",")));
validateLongStringColumns(carbonSchema, longStringColumns);
}
this.schema = updateSchemaFields(carbonSchema, longStringColumns);
// build CarbonTable using schema
CarbonTable table = buildCarbonTable();
if (persistSchemaFile) {
Expand All @@ -559,6 +561,28 @@ public CarbonLoadModel buildLoadModel(Schema carbonSchema)
return buildLoadModel(table, UUID, taskNo, options);
}

private void validateLongStringColumns(Schema carbonSchema, Set<String> longStringColumns) {
// long string columns must be string or varchar type
for (Field field :carbonSchema.getFields()) {
if (longStringColumns.contains(field.getFieldName().toLowerCase()) && (
(field.getDataType() != DataTypes.STRING) && field.getDataType() != DataTypes.VARCHAR)) {
throw new RuntimeException(
"long string column : " + field.getFieldName() + "is not supported for data type: "
+ field.getDataType());
}
}
// long string columns must not be present in sort columns
if (sortColumns != null) {
for (String col : sortColumns) {
// already will be in lower case
if (longStringColumns.contains(col)) {
throw new RuntimeException(
"long string column : " + col + "must not be present in sort columns");
}
}
}
}

/**
* Build a {@link CarbonTable}
*/
Expand Down Expand Up @@ -736,9 +760,11 @@ private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID, String task
return build;
}

/* loop through all the parent column and change fields name lower case.
* this is to match with sort column case */
private Schema schemaFieldNameToLowerCase(Schema schema) {
/* loop through all the parent column and
a) change fields name to lower case.
this is to match with sort column case.
b) change string fields to varchar type */
private Schema updateSchemaFields(Schema schema, Set<String> longStringColumns) {
if (schema == null) {
return null;
}
Expand All @@ -747,7 +773,36 @@ private Schema schemaFieldNameToLowerCase(Schema schema) {
if (fields[i] != null) {
fields[i].updateNameToLowerCase();
}

if (longStringColumns != null) {
/* Also update the string type to varchar */
if (longStringColumns.contains(fields[i].getFieldName())) {
fields[i].updateDataTypeToVarchar();
}
}
}
return new Schema(fields);
}

private void updateToLoadOptions(Map.Entry<String, String> entry) {
if (this.options == null) {
// convert it to treeMap as keys need to be case insensitive
this.options = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
}
// update it to load options
this.options.put(entry.getKey(), entry.getValue());
}

private void withSortScope(Map.Entry<String, String> entry) {
String sortScope = entry.getValue();
if (sortScope != null) {
if ((!CarbonUtil.isValidSortOption(sortScope))) {
throw new IllegalArgumentException("Invalid Sort Scope Option: " + sortScope);
} else if (sortScope.equalsIgnoreCase("global_sort")) {
throw new IllegalArgumentException("global sort is not supported");
}
}
// update it to load options
updateToLoadOptions(entry);
}
}
Expand Up @@ -219,6 +219,11 @@ public void setColumnComment(String columnComment) {
this.columnComment = columnComment;
}

/* for SDK, change string type to varchar by default for parent columns */
public void updateDataTypeToVarchar() {
this.type = DataTypes.VARCHAR;
}

/*can use to change the case of the schema */
public void updateNameToLowerCase() {
this.name = name.toLowerCase();
Expand Down

0 comments on commit a9cc434

Please sign in to comment.