Skip to content

Commit

Permalink
Merge bf85061 into 8c49e5b
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchuanyin committed Oct 22, 2018
2 parents 8c49e5b + bf85061 commit de954f8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.carbondata.datasource

import java.io.File
import java.text.SimpleDateFormat
import java.util
import java.util.{Date, Random}

import scala.collection.JavaConverters._
Expand All @@ -30,17 +29,14 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.util.SparkUtil
import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonV3DataFormatConstants}
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter}
import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.Row
import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
Expand Down Expand Up @@ -433,14 +429,16 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
.append(" {\"address\":\"varchar\"},\n")
.append(" {\"age\":\"int\"}\n")
.append(" {\"age\":\"int\"},\n")
.append(" {\"note\":\"varchar\"}\n")
.append("]")
.toString()
val builder = CarbonWriter.builder()
val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
for (i <- 0 until 3) {
val totalRecordsNum = 3
for (i <- 0 until totalRecordsNum) {
// write a varchar with 75,000 length
writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString))
writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString, RandomStringUtils.randomAlphabetic(75000)))
}
writer.close()

Expand All @@ -449,19 +447,19 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
if (spark.sparkContext.version.startsWith("2.1")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable (name string, address string, age int)
|USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """
s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string)
|USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address, note") """
.stripMargin)
} else {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon
|OPTIONS("long_String_columns"="address") LOCATION
s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) USING carbon
|OPTIONS("long_String_columns"="address, note") LOCATION
|'$writerPath' """.stripMargin)
}
assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1)
val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList()
assert(op.get(0).getString(0).length == 75000)
checkAnswer(spark.sql("select count(*) from sdkOutputTable where age = 0"), Seq(Row(1)))
checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTable where length(address)=75000 and length(note)=75000)"),
Seq(Row(totalRecordsNum)))
spark.sql("DROP TABLE sdkOutputTable")

//--------------- data source external table without schema ---------------------------
Expand All @@ -471,16 +469,16 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
spark
.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
|'$writerPath', "long_String_columns" "address") """.stripMargin)
|'$writerPath', "long_String_columns" "address, note") """.stripMargin)
} else {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
|("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin)
|("long_String_columns"="address, note") LOCATION '$writerPath' """.stripMargin)
}
assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1)
val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList()
assert(op1.get(0).getString(0).length == 75000)
checkAnswer(spark.sql("select count(*) from sdkOutputTableWithoutSchema where age = 0"), Seq(Row(1)))
checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTableWithoutSchema where length(address)=75000 and length(note)=75000)"),
Seq(Row(totalRecordsNum)))
spark.sql("DROP TABLE sdkOutputTableWithoutSchema")
clearDataMapCache
cleanTestData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
Expand Down Expand Up @@ -412,10 +413,13 @@ private void setCsvHeader(CarbonLoadModel model) {
public CarbonLoadModel buildLoadModel(Schema carbonSchema)
throws IOException, InvalidLoadOptionException {
timestamp = System.nanoTime();
Set<String> longStringColumns = null;
if (options != null && options.get("long_string_columns") != null) {
longStringColumns =
new HashSet<>(Arrays.asList(options.get("long_string_columns").toLowerCase().split(",")));
Set<String> longStringColumns = new HashSet<>();
if (options != null && options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) {
String[] specifiedLongStrings =
options.get(CarbonCommonConstants.LONG_STRING_COLUMNS).toLowerCase().split(",");
for (String str : specifiedLongStrings) {
longStringColumns.add(str.trim());
}
validateLongStringColumns(carbonSchema, longStringColumns);
}
this.schema = updateSchemaFields(carbonSchema, longStringColumns);
Expand Down Expand Up @@ -603,12 +607,11 @@ private Schema updateSchemaFields(Schema schema, Set<String> longStringColumns)
for (int i = 0; i < fields.length; i++) {
if (fields[i] != null) {
fields[i].updateNameToLowerCase();
}

if (longStringColumns != null) {
/* Also update the string type to varchar */
if (longStringColumns.contains(fields[i].getFieldName())) {
fields[i].updateDataTypeToVarchar();
if (longStringColumns != null) {
/* Also update the string type to varchar */
if (longStringColumns.contains(fields[i].getFieldName())) {
fields[i].updateDataTypeToVarchar();
}
}
}
}
Expand Down

0 comments on commit de954f8

Please sign in to comment.