Skip to content

Commit

Permalink
Merge 0684596 into f5c7a19
Browse files Browse the repository at this point in the history
  • Loading branch information
manishgupta88 committed Sep 8, 2018
2 parents f5c7a19 + 0684596 commit 17d1ae2
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 16 deletions.
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.types._

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField, StructType => CarbonStructType}
import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
import org.apache.carbondata.core.scan.expression.conditional._
import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
Expand Down Expand Up @@ -78,6 +78,10 @@ object CarbonSparkDataSourceUtil {
convertSparkToCarbonDataType(field.dataType)))
}
CarbonDataTypes.createStructType(carbonFields)
case MapType(keyType, valueType, _) =>
val keyDataType: CarbonDataType = convertSparkToCarbonDataType(keyType)
val valueDataType: CarbonDataType = convertSparkToCarbonDataType(valueType)
CarbonDataTypes.createMapType(keyDataType, valueDataType)
case NullType => CarbonDataTypes.NULL
case decimal: DecimalType =>
CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
Expand Down Expand Up @@ -196,11 +200,7 @@ object CarbonSparkDataSourceUtil {
dataSchema: StructType): CarbonLoadModel = {
val schema = new Schema(dataSchema.fields.map { field =>
val dataType = convertSparkToCarbonDataType(field.dataType)
dataType match {
case s: CarbonStructType =>
new Field(field.name, s, s.getFields)
case _ => new Field(field.name, dataType)
}
new Field(field.name, dataType)
})
val builder = new CarbonWriterBuilder
builder.isTransactionalTable(false)
Expand Down
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUn
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -200,8 +200,10 @@ class SparkCarbonFileFormat extends FileFormat
data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
case s: StructType =>
data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
case s: ArrayType =>
data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
case a: ArrayType =>
data(i) = new ArrayObject(extractData(row.getArray(i), a.elementType))
case m: MapType =>
data(i) = extractMapData(row.getMap(i), m)
case d: DateType =>
data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
case d: TimestampType =>
Expand All @@ -217,6 +219,15 @@ class SparkCarbonFileFormat extends FileFormat
data
}

private def extractMapData(data: AnyRef, mapType: MapType): ArrayObject = {
val mapData = data.asInstanceOf[MapData]
val keys = extractData(mapData.keyArray(), mapType.keyType)
val values = extractData(mapData.valueArray(), mapType.valueType)
new ArrayObject(keys.zip(values).map { case (key, value) =>
new StructObject(Array(key, value))
})
}

private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = {
dataType match {
case d: DateType =>
Expand All @@ -241,8 +252,10 @@ class SparkCarbonFileFormat extends FileFormat
data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
case s: StructType =>
data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
case s: ArrayType =>
data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
case a: ArrayType =>
data(i) = new ArrayObject(extractData(row.getArray(i), a.elementType))
case m: MapType =>
data(i) = extractMapData(row.getMap(i), m)
case d: DateType =>
data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
case d: TimestampType =>
Expand Down
Expand Up @@ -148,7 +148,6 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {

df.write
.format("parquet").saveAsTable("parquet_table")
spark.sql("describe parquet_table").show(false)
spark.sql("create table carbon_table(c1 string, c2 struct<a1:string, a2:string>, number int) using carbon")
spark.sql("insert into carbon_table select * from parquet_table")
assert(spark.sql("select * from carbon_table").count() == 10)
Expand Down Expand Up @@ -212,6 +211,131 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("drop table if exists parquet_table")
}

test("test write with array type with value as nested map type") {
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, Array(Map("b" -> "c")), x))
.toDF("c1", "c2", "number")

df.write
.format("parquet").saveAsTable("parquet_table")
spark.sql("create table carbon_table(c1 string, c2 array<map<string,string>>, number int) using carbon")
spark.sql("insert into carbon_table select * from parquet_table")
assert(spark.sql("select * from carbon_table").count() == 10)
TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
}

test("test write with struct type with value as nested map type") {
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, ("a", Map("b" -> "c")), x))
.toDF("c1", "c2", "number")

df.write
.format("parquet").saveAsTable("parquet_table")
spark.sql("create table carbon_table(c1 string, c2 struct<a1:string, a2:map<string,string>>, number int) using carbon")
spark.sql("insert into carbon_table select * from parquet_table")
assert(spark.sql("select * from carbon_table").count() == 10)
TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
}

test("test write with map type") {
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, Map("b" -> "c"), x))
.toDF("c1", "c2", "number")

df.write
.format("parquet").saveAsTable("parquet_table")
spark.sql("create table carbon_table(c1 string, c2 map<string, string>, number int) using carbon")
spark.sql("insert into carbon_table select * from parquet_table")
assert(spark.sql("select * from carbon_table").count() == 10)
TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
}

test("test write with map type with Int data type as key") {
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, Map(99 -> "c"), x))
.toDF("c1", "c2", "number")

df.write
.format("parquet").saveAsTable("parquet_table")
spark.sql("create table carbon_table(c1 string, c2 map<int, string>, number int) using carbon")
spark.sql("insert into carbon_table select * from parquet_table")
assert(spark.sql("select * from carbon_table").count() == 10)
TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
}

test("test write with map type with value as nested map type") {
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, Map("a" -> Map("b" -> "c")), x))
.toDF("c1", "c2", "number")

df.write
.format("parquet").saveAsTable("parquet_table")
spark.sql("create table carbon_table(c1 string, c2 map<string, map<string, string>>, number int) using carbon")
spark.sql("insert into carbon_table select * from parquet_table")
assert(spark.sql("select * from carbon_table").count() == 10)
TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
}

test("test write with map type with value as nested struct type") {
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, Map("a" -> ("b", "c")), x))
.toDF("c1", "c2", "number")

df.write
.format("parquet").saveAsTable("parquet_table")
spark.sql("create table carbon_table(c1 string, c2 map<string, struct<a1:string, a2:string>>, number int) using carbon")
spark.sql("insert into carbon_table select * from parquet_table")
assert(spark.sql("select * from carbon_table").count() == 10)
TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
}

test("test write with map type with value as nested array type") {
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, Map("a" -> Array("b", "c")), x))
.toDF("c1", "c2", "number")

df.write
.format("parquet").saveAsTable("parquet_table")
spark.sql("create table carbon_table(c1 string, c2 map<string, array<string>>, number int) using carbon")
spark.sql("insert into carbon_table select * from parquet_table")
assert(spark.sql("select * from carbon_table").count() == 10)
TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists parquet_table")
}

test("test write using ddl and options") {
spark.sql("drop table if exists carbon_table")
Expand Down
Expand Up @@ -468,7 +468,7 @@ private static Field prepareFields(Schema.Field avroField) {
case MAP:
// recursively get the sub fields
ArrayList<StructField> mapSubFields = new ArrayList<>();
StructField mapField = prepareSubFields("val", childSchema);
StructField mapField = prepareSubFields(fieldName, childSchema);
if (null != mapField) {
// key value field will be wrapped inside a map struct field
StructField keyValueField = mapField.getChildren().get(0);
Expand Down Expand Up @@ -575,7 +575,7 @@ private static StructField prepareSubFields(String fieldName, Schema childSchema
keyValueFields.add(keyField);
keyValueFields.add(valueField);
StructField mapKeyValueField =
new StructField(fieldName, DataTypes.createStructType(keyValueFields));
new StructField(fieldName + ".val", DataTypes.createStructType(keyValueFields));
// value dataType will be at position 1 in the fields
MapType mapType =
DataTypes.createMapType(DataTypes.STRING, mapKeyValueField.getDataType());
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.MapType;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
Expand Down Expand Up @@ -673,8 +674,8 @@ private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuil
.addColumn(new StructField(field.getFieldName(), complexType), valIndex, false);
} else if (field.getDataType().getName().equalsIgnoreCase("MAP")) {
// Loop through the inner columns for MapType
DataType mapType =
DataTypes.createMapType(DataTypes.STRING, field.getChildren().get(0).getDataType());
DataType mapType = DataTypes.createMapType(((MapType) field.getDataType()).getKeyType(),
field.getChildren().get(0).getDataType());
tableSchemaBuilder
.addColumn(new StructField(field.getFieldName(), mapType), valIndex, false);
}
Expand Down
59 changes: 59 additions & 0 deletions store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
Expand Up @@ -17,14 +17,18 @@

package org.apache.carbondata.sdk.file;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.metadata.datatype.ArrayType;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.MapType;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.datatype.StructType;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;

/**
Expand Down Expand Up @@ -130,6 +134,7 @@ public Field(String name, DataType type, List<StructField> fields) {
public Field(String name, DataType type) {
this.name = name;
this.type = type;
initComplexTypeChildren();
}

/**
Expand Down Expand Up @@ -218,4 +223,58 @@ public void setColumnComment(String columnComment) {
public void updateNameToLowerCase() {
this.name = name.toLowerCase();
}

private void initComplexTypeChildren() {
if (getDataType().isComplexType()) {
StructField subFields = prepareSubFields(getFieldName(), getDataType());
if (DataTypes.isArrayType(getDataType()) || DataTypes.isMapType(getDataType())) {
children = subFields.getChildren();
} else if (DataTypes.isStructType(getDataType())) {
children = ((StructType) subFields.getDataType()).getFields();
}
}
}

/**
* prepare sub fields for complex types
*
* @param fieldName column name
* @param dataType data type of column or it's children
* @return
*/
private StructField prepareSubFields(String fieldName, DataType dataType) {
if (DataTypes.isArrayType(dataType)) {
List<StructField> arrayFields = new ArrayList<>();
StructField arrayField = prepareSubFields(fieldName, ((ArrayType) dataType).getElementType());
arrayFields.add(arrayField);
return new StructField(fieldName, DataTypes.createArrayType(arrayField.getDataType()),
arrayFields);
} else if (DataTypes.isStructType(dataType)) {
List<StructField> structFields = new ArrayList<>();
List<StructField> fields = ((StructType) dataType).getFields();
for (StructField field : fields) {
structFields.add(prepareSubFields(field.getFieldName(), field.getDataType()));
}
return new StructField(fieldName, DataTypes.createStructType(structFields), structFields);
} else if (DataTypes.isMapType(dataType)) {
// Internally Map<key, value> is stored as Array<struct<key, value>>. So the below method
// will convert a map type into similar field structure. The columnSchema will be formed
// as Map<Struct<key,value>>
List<StructField> mapFields = new ArrayList<>();
MapType mapType = (MapType) dataType;
// key is primitive type so type can be fetched directly
StructField keyField = new StructField(fieldName + ".key", mapType.getKeyType());
StructField valueField = prepareSubFields(fieldName + ".value", mapType.getValueType());
mapFields.add(keyField);
mapFields.add(valueField);
StructField field =
new StructField(fieldName + ".val", DataTypes.createStructType(mapFields));
MapType mapDataType = DataTypes.createMapType(keyField.getDataType(), field.getDataType());
List<StructField> mapStructField = new ArrayList<>();
mapStructField.add(field);
return new StructField(fieldName, mapDataType, mapStructField);
} else {
return new StructField(fieldName, dataType);
}
}
}

0 comments on commit 17d1ae2

Please sign in to comment.