Skip to content

Commit

Permalink
[CARBONDATA-3356] Provide better exception for users to solve problem…
Browse files Browse the repository at this point in the history
… when carbonData DataSource read SDK files with varchar

fix error

optimize

optimize
  • Loading branch information
xubo245 committed Jun 12, 2019
1 parent 0a113ce commit 746698e
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 7 deletions.
Expand Up @@ -264,6 +264,16 @@ public static boolean isSameColumnAndDifferentDatatypeInSchema(
.equalsIgnoreCase(tableColumnList.get(i).getColumnName()) && !indexFileColumnList.get(j)
.getDataType().getName()
.equalsIgnoreCase(tableColumnList.get(i).getDataType().getName())) {
if ("varchar".equalsIgnoreCase(indexFileColumnList.get(j).getDataType().getName()) &&
"string".equalsIgnoreCase(tableColumnList.get(i).getDataType().getName())) {
throw new RuntimeException("Datatype of the Column "
+ indexFileColumnList.get(j).getDataType().getName()
+ " present in index file, is varchar and not same as datatype of the column " +
"with same name present in table, " +
"because carbon convert varchar of carbon to string of spark, " +
"please set long_string_columns for varchar column: "
+ tableColumnList.get(i).getColumnName());
}
LOG.error("Datatype of the Column " + indexFileColumnList.get(j).getColumnName()
+ " present in index file, is not same as datatype of the column with same name"
+ "present in table");
Expand Down
15 changes: 8 additions & 7 deletions docs/carbon-as-spark-datasource-guide.md
Expand Up @@ -42,19 +42,20 @@ Now you can create Carbon table using Spark's datasource DDL syntax.

| Property | Default Value | Description |
|-----------|--------------|------------|
| table_blocksize | 1024 | Size of blocks to write onto hdfs. For more details, see [Table Block Size Configuration](./ddl-of-carbondata.md#table-block-size-configuration). |
| table_blocksize | 1024 | Size of blocks to write onto hdfs. For more details, see [Table Block Size Configuration](./ddl-of-carbondata.md#table-block-size-configuration). |
| table_blocklet_size | 64 | Size of blocklet to write. |
| table_page_size_inmb | 0 | Size of each page in carbon table, if page size crosses this value before 32000 rows, page will be cut to that many rows. Helps in keep page size to fit cache size |
| local_dictionary_threshold | 10000 | Cardinality upto which the local dictionary can be generated. For more details, see [Local Dictionary Configuration](./ddl-of-carbondata.md#local-dictionary-configuration). |
| local_dictionary_enable | false | Enable local dictionary generation. For more details, see [Local Dictionary Configuration](./ddl-of-carbondata.md#local-dictionary-configuration). |
| sort_columns | all dimensions are sorted | Columns to include in sort and its order of sort. For more details, see [Sort Columns Configuration](./ddl-of-carbondata.md#sort-columns-configuration). |
| sort_scope | local_sort | Sort scope of the load.Options include no sort, local sort, batch sort, and global sort. For more details, see [Sort Scope Configuration](./ddl-of-carbondata.md#sort-scope-configuration). |
| long_string_columns | null | Comma separated string/char/varchar columns which are more than 32k length. For more details, see [String longer than 32000 characters](./ddl-of-carbondata.md#string-longer-than-32000-characters). |
| local_dictionary_threshold | 10000 | Cardinality upto which the local dictionary can be generated. For more details, see [Local Dictionary Configuration](./ddl-of-carbondata.md#local-dictionary-configuration). |
| local_dictionary_enable | false | Enable local dictionary generation. For more details, see [Local Dictionary Configuration](./ddl-of-carbondata.md#local-dictionary-configuration). |
| sort_columns | all dimensions are sorted | Columns to include in sort and its order of sort. For more details, see [Sort Columns Configuration](./ddl-of-carbondata.md#sort-columns-configuration). |
| sort_scope | local_sort | Sort scope of the load.Options include no sort, local sort, batch sort, and global sort. For more details, see [Sort Scope Configuration](./ddl-of-carbondata.md#sort-scope-configuration). |
| long_string_columns | null | Comma separated string/char/varchar columns which are more than 32k length. For more details, see [String longer than 32000 characters](./ddl-of-carbondata.md#string-longer-than-32000-characters). |

**NOTE:** please set long_string_columns for varchar column.
## Example

```
CREATE TABLE CARBON_TABLE (NAME STRING) USING CARBON OPTIONS('table_block_size'='256')
CREATE TABLE CARBON_TABLE (NAME STRING) USING CARBON OPTIONS('table_block_size'='256')
```

# Using DataFrame
Expand Down
Expand Up @@ -22,6 +22,7 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.carbondata.datasource.TestUtil._
Expand All @@ -38,6 +39,84 @@ import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}

class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {


var writerOutputPath = new File(this.getClass.getResource("/").getPath
+ "../../target/SparkCarbonFileFormat/SDKWriterOutput/").getCanonicalPath
//getCanonicalPath gives path with \, but the code expects /.
writerOutputPath = writerOutputPath.replace("\\", "/")

def buildTestData(rows: Int,
sortColumns: List[String]): Any = {
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"stringField\":\"string\"},\n")
.append(" {\"byteField\":\"byte\"},\n")
.append(" {\"shortField\":\"short\"},\n")
.append(" {\"intField\":\"int\"},\n")
.append(" {\"longField\":\"long\"},\n")
.append(" {\"doubleField\":\"double\"},\n")
.append(" {\"floatField\":\"float\"},\n")
.append(" {\"decimalField\":\"decimal(17,2)\"},\n")
.append(" {\"boolField\":\"boolean\"},\n")
.append(" {\"dateField\":\"DATE\"},\n")
.append(" {\"timeField\":\"TIMESTAMP\"},\n")
.append(" {\"varcharField\":\"varchar\"},\n")
.append(" {\"varcharField2\":\"varchar\"}\n")
.append("]")
.toString()

try {
val builder = CarbonWriter.builder()
val writer =
builder.outputPath(writerOutputPath)
.sortBy(sortColumns.toArray)
.uniqueIdentifier(System.currentTimeMillis)
.withBlockSize(2)
.withCsvInput(Schema.parseJson(schema))
.writtenBy("TestNonTransactionalCarbonTable")
.build()
var i = 0
while (i < rows) {
writer.write(Array[String]("robot" + i,
String.valueOf(i / 100),
String.valueOf(i / 100),
String.valueOf(i),
String.valueOf(i),
String.valueOf(i),
String.valueOf(i),
String.valueOf(i),
"true",
"2019-03-02",
"2019-02-12 03:03:34",
"var1",
"var2"))
i += 1
}
writer.close()
} catch {
case ex: Throwable => throw new RuntimeException(ex)
}
}

test("Carbon DataSource read SDK data with varchar") {
import spark._
FileUtils.deleteDirectory(new File(writerOutputPath))
val num = 10000
buildTestData(num, List("stringField", "intField"))
if (SparkUtil.isSparkVersionXandAbove("2.2")) {
sql("DROP TABLE IF EXISTS carbontable_varchar")
sql("DROP TABLE IF EXISTS carbontable_varchar2")
sql(s"CREATE TABLE carbontable_varchar USING CARBON LOCATION '$writerOutputPath'")
val e = intercept[Exception] {
sql("SELECT COUNT(*) FROM carbontable_varchar").show()
}
assert(e.getMessage.contains("Datatype of the Column VARCHAR present in index file, is varchar and not same as datatype of the column with same name present in table, because carbon convert varchar of carbon to string of spark, please set long_string_columns for varchar column"))

sql(s"CREATE TABLE carbontable_varchar2 USING CARBON OPTIONS('long_String_columns'='varcharField,varcharField2') LOCATION '$writerOutputPath'")
checkAnswer(sql("SELECT COUNT(*) FROM carbontable_varchar2"), Seq(Row(num)))
}
}

test("test write using dataframe") {
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
Expand Down
20 changes: 20 additions & 0 deletions store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
Expand Up @@ -20,6 +20,8 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
Expand Down Expand Up @@ -80,6 +82,24 @@ public Field(String name, String type) {
this.type = DataTypes.DOUBLE;
} else if (type.equalsIgnoreCase("binary")) {
this.type = DataTypes.BINARY;
} else if (type.toLowerCase().startsWith("decimal")) {
if ("decimal".equalsIgnoreCase(type.toLowerCase())) {
this.type = DataTypes.createDefaultDecimalType();
} else {
try {
Matcher m = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(type);
m.find();
String matchedString = m.group(1);
String[] scaleAndPrecision = matchedString.split(",");
precision = Integer.parseInt(scaleAndPrecision[0].trim());
scale = Integer.parseInt(scaleAndPrecision[1].trim());
this.type = DataTypes.createDecimalType(precision, scale);
} catch (Exception e) {
throw new IllegalArgumentException("unsupported data type: " + type
+ ". Please use decimal or decimal(precision,scale), " +
"precision can be 10 and scale can be 2", e);
}
}
} else if (type.equalsIgnoreCase("array")) {
this.type = DataTypes.createDefaultArrayType();
} else if (type.equalsIgnoreCase("struct")) {
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.DecimalType;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
Expand Down Expand Up @@ -106,6 +107,71 @@ public void testWriteFilesJsonSchema() throws IOException {
FileUtils.deleteDirectory(new File(path));
}

@Test
public void testWriteJsonSchemaWithDefaultDecimal() {
String jsonSchema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
.append(" {\"age\":\"int\"},\n")
.append(" {\"height\":\"double\"},\n")
.append(" {\"decimalField\":\"decimal\"}\n")
.append("]")
.toString();
Schema schema = Schema.parseJson(jsonSchema);
assert (10 == ((DecimalType) schema.getFields()[3].getDataType()).getPrecision());
assert (2 == ((DecimalType) schema.getFields()[3].getDataType()).getScale());
}

@Test
public void testWriteJsonSchemaWithCustomDecimal() {
String jsonSchema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
.append(" {\"age\":\"int\"},\n")
.append(" {\"height\":\"double\"},\n")
.append(" {\"decimalField\":\"decimal(17,3)\"}\n")
.append("]")
.toString();
Schema schema = Schema.parseJson(jsonSchema);
assert (17 == ((DecimalType) schema.getFields()[3].getDataType()).getPrecision());
assert (3 == ((DecimalType) schema.getFields()[3].getDataType()).getScale());
}

@Test
public void testWriteJsonSchemaWithCustomDecimalAndSpace() {
String jsonSchema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
.append(" {\"age\":\"int\"},\n")
.append(" {\"height\":\"double\"},\n")
.append(" {\"decimalField\":\"decimal( 17, 3)\"}\n")
.append("]")
.toString();
Schema schema = Schema.parseJson(jsonSchema);
assert (17 == ((DecimalType) schema.getFields()[3].getDataType()).getPrecision());
assert (3 == ((DecimalType) schema.getFields()[3].getDataType()).getScale());
}

@Test
public void testWriteJsonSchemaWithImproperDecimal() {
String jsonSchema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
.append(" {\"age\":\"int\"},\n")
.append(" {\"height\":\"double\"},\n")
.append(" {\"decimalField\":\"decimal( 17, )\"}\n")
.append("]")
.toString();
try {
Schema.parseJson(jsonSchema);
assert (false);
} catch (Exception e) {
assert (e.getMessage().contains("unsupported data type: decimal( 17, ). " +
"Please use decimal or decimal(precision,scale), " +
"precision can be 10 and scale can be 2"));
}
}

@Test
public void testWriteFilesBuildWithJsonSchema() throws IOException, InvalidLoadOptionException, InterruptedException {
String path = "./testWriteFilesJsonSchema";
Expand Down

0 comments on commit 746698e

Please sign in to comment.