Skip to content

Commit

Permalink
[CARBONDATA-3179] Map Data Load Failure and Struct Projection Pushdow…
Browse files Browse the repository at this point in the history
…n Issue

Problem1 : Data Load failing for Insert into Select from same table in containing Map datatype.
Solution: Map type was not handled for this scenario. Handled it now.

Problem2 : Projection Pushdown not supported for table containing Struct of Map.
Solution: Pass the parent column only for projection pushdown if table contains MapType.

This closes #2993
  • Loading branch information
manishnalla1994 authored and kumarvishal09 committed Dec 20, 2018
1 parent 34923db commit 96b2ea3
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 66 deletions.
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk

import scala.collection.JavaConversions._

class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
Expand Down Expand Up @@ -471,4 +470,74 @@ class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
"sort_columns is unsupported for map datatype column: mapfield"))
}

test("Data Load Fail Issue") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<INT,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
sql(
s"""
| LOAD DATA LOCAL INPATH '$path'
| INTO TABLE carbon OPTIONS(
| 'header' = 'false')
""".stripMargin)
sql("INSERT INTO carbon SELECT * FROM carbon")
checkAnswer(sql("select * from carbon"), Seq(
Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")),
Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar"))
))
}

test("Struct inside map") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<INT,struct<kk:STRING,mm:STRING>>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
sql("INSERT INTO carbon values('1\002man\003nan\0012\002kands\003dsnknd')")
sql("INSERT INTO carbon SELECT * FROM carbon")
checkAnswer(sql("SELECT * FROM carbon limit 1"),
Seq(Row(Map(1 -> Row("man", "nan"), (2 -> Row("kands", "dsnknd"))))))
}

test("Struct inside map pushdown") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<INT,struct<kk:STRING,mm:STRING>>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
sql("INSERT INTO carbon values('1\002man\003nan\0012\002kands\003dsnknd')")
checkAnswer(sql("SELECT mapField[1].kk FROM carbon"), Row("man"))
}

test("Map inside struct") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| structField struct<intVal:INT,map1:MAP<STRING,STRING>>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
sql("INSERT INTO carbon values('1\001man\003nan\002kands\003dsnknd')")
val res = sql("SELECT structField.intVal FROM carbon").show(false)
checkAnswer(sql("SELECT structField.intVal FROM carbon"), Seq(Row(1)))
}

}
Expand Up @@ -20,6 +20,7 @@ package org.apache.carbondata.spark.rdd
import java.io.{DataInputStream, InputStreamReader}
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util
import java.util.regex.Pattern

import scala.collection.mutable
Expand Down Expand Up @@ -293,11 +294,12 @@ class CarbonBlockDistinctValuesCombineRDD(
row = rddIter.next()
if (row != null) {
rowCount += 1
val complexDelimiters = new util.ArrayList[String]
model.delimiters.foreach(x => complexDelimiters.add(x))
for (i <- 0 until dimNum) {
dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
model.serializationNullFormat,
model.delimiters(0),
model.delimiters(1),
complexDelimiters,
timeStampFormat,
dateFormat))
}
Expand Down
Expand Up @@ -341,9 +341,7 @@ class NewRddIterator(rddIter: Iterator[Row],
private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
private val dateFormat = new SimpleDateFormat(dateFormatString)
private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
private val complexDelimiters = carbonLoadModel.getComplexDelimiters
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
import scala.collection.JavaConverters._
Expand All @@ -357,7 +355,7 @@ class NewRddIterator(rddIter: Iterator[Row],
val columns = new Array[AnyRef](row.length)
for (i <- 0 until columns.length) {
columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
complexDelimiters, timeStampFormat, dateFormat,
isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
}
columns
Expand Down Expand Up @@ -391,9 +389,7 @@ class LazyRddIterator(serializer: SerializerInstance,
.getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
private val dateFormat = new SimpleDateFormat(dateFormatString)
private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
private val complexDelimiters = carbonLoadModel.getComplexDelimiters
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
// the order of fields in dataframe and createTable may be different, here we need to know whether
Expand Down Expand Up @@ -431,7 +427,7 @@ class LazyRddIterator(serializer: SerializerInstance,
val columns = new Array[AnyRef](row.length)
for (i <- 0 until columns.length) {
columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
complexDelimiters, timeStampFormat, dateFormat,
isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
}
columns
Expand Down
Expand Up @@ -62,22 +62,22 @@ object CarbonScalaUtil {

def getString(value: Any,
serializationNullFormat: String,
delimiterLevel1: String,
delimiterLevel2: String,
complexDelimiters: util.ArrayList[String],
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat,
isVarcharType: Boolean = false,
level: Int = 1): String = {
FieldConverter.objectToString(value, serializationNullFormat, delimiterLevel1,
delimiterLevel2, timeStampFormat, dateFormat, isVarcharType = isVarcharType, level)
level: Int = 0): String = {
FieldConverter.objectToString(value, serializationNullFormat, complexDelimiters,
timeStampFormat, dateFormat, isVarcharType = isVarcharType, level)
}

/**
* Converts incoming value to String after converting data as per the data type.
* @param value Input value to convert
* @param dataType Datatype to convert and then convert to String
*
* @param value Input value to convert
* @param dataType Datatype to convert and then convert to String
* @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
* @param dateFormat DataFormat to convert in case of DateType datatype
* @param dateFormat DataFormat to convert in case of DateType datatype
* @return converted String
*/
def convertToDateAndTimeFormats(
Expand Down Expand Up @@ -126,7 +126,8 @@ object CarbonScalaUtil {

/**
* Converts incoming value to String after converting data as per the data type.
* @param value Input value to convert
*
* @param value Input value to convert
* @param column column which it value belongs to
* @return converted String
*/
Expand Down Expand Up @@ -183,7 +184,8 @@ object CarbonScalaUtil {

/**
* Converts incoming value to String after converting data as per the data type.
* @param value Input value to convert
*
* @param value Input value to convert
* @param column column which it value belongs to
* @return converted String
*/
Expand Down Expand Up @@ -238,6 +240,7 @@ object CarbonScalaUtil {

/**
* Update partition values as per the right date and time format
*
* @return updated partition spec
*/
def updatePartitions(partitionSpec: mutable.LinkedHashMap[String, String],
Expand Down Expand Up @@ -466,7 +469,7 @@ object CarbonScalaUtil {
}
} catch {
case e: Exception =>
// ignore it
// ignore it
}
}

Expand Down
Expand Up @@ -983,15 +983,14 @@ object CarbonDataRDDFactory {
// generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
// input data from DataFrame
val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
val complexDelimiters = carbonLoadModel.getComplexDelimiters
val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
dataFrame.get.rdd.map { row =>
if (null != row && row.length > partitionColumnIndex &&
null != row.get(partitionColumnIndex)) {
(CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
complexDelimiters, timeStampFormat, dateFormat), row)
} else {
(null, row)
}
Expand Down
Expand Up @@ -24,7 +24,7 @@ import scala.util.control.Breaks._
import org.apache.spark.CarbonInputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetArrayItem, GetStructField, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetArrayItem, GetMapValue, GetStructField, NamedExpression}
import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonFilters
Expand Down Expand Up @@ -101,32 +101,41 @@ case class CarbonDatasourceHadoopRelation(
if (!complexFilterExists.exists(f => f.contains(true))) {
var parentColumn = new ListBuffer[String]
// In case of Struct or StructofStruct Complex type, get the project column for given
// parent/child field and pushdown the corresponding project column. In case of Array,
// ArrayofStruct or StructofArray, pushdown parent column
// parent/child field and pushdown the corresponding project column. In case of Array, Map,
// ArrayofStruct, StructofArray, MapOfStruct or StructOfMap, pushdown parent column
var reqColumns = projects.map {
case a@Alias(s: GetStructField, name) =>
var arrayTypeExists = false
var ifGetArrayItemExists = s
var arrayOrMapTypeExists = false
var ifGetArrayOrMapItemExists = s
breakable({
while (ifGetArrayItemExists.containsChild != null) {
if (ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
arrayTypeExists = ifGetArrayItemExists.childSchema.toString().contains("ArrayType")
while (ifGetArrayOrMapItemExists.containsChild != null) {
if (ifGetArrayOrMapItemExists.childSchema.toString().contains("ArrayType") ||
ifGetArrayOrMapItemExists.childSchema.toString().contains("MapType")) {
arrayOrMapTypeExists = true
break
}
if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
arrayTypeExists = s.childSchema.toString().contains("ArrayType")
if (ifGetArrayOrMapItemExists.child.isInstanceOf[AttributeReference]) {
arrayOrMapTypeExists = s.childSchema.toString().contains("ArrayType") ||
s.childSchema.toString().contains("MapType")
break
} else {
if (ifGetArrayItemExists.child.isInstanceOf[GetArrayItem]) {
arrayTypeExists = true
if (ifGetArrayOrMapItemExists.child.isInstanceOf[GetArrayItem] ||
ifGetArrayOrMapItemExists.child.isInstanceOf[GetMapValue]) {
arrayOrMapTypeExists = true
break
} else {
ifGetArrayItemExists = ifGetArrayItemExists.child.asInstanceOf[GetStructField]
if (ifGetArrayOrMapItemExists.child.isInstanceOf[GetStructField]) {
ifGetArrayOrMapItemExists = ifGetArrayOrMapItemExists.child
.asInstanceOf[GetStructField]
} else {
arrayOrMapTypeExists = true
break
}
}
}
}
})
if (!arrayTypeExists) {
if (!arrayOrMapTypeExists) {
parentColumn += s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
parentColumn = parentColumn.distinct
s.toString().replaceAll("#[0-9]*", "").toLowerCase
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.apache.carbondata.streaming.parser

import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util

import org.apache.carbondata.core.constants.CarbonCommonConstants

Expand All @@ -28,8 +29,7 @@ object FieldConverter {
* Return a String representation of the input value
* @param value input value
* @param serializationNullFormat string for null value
* @param delimiterLevel1 level 1 delimiter for complex type
* @param delimiterLevel2 level 2 delimiter for complex type
* @param complexDelimiters List of Complex Delimiters
* @param timeStampFormat timestamp format
* @param dateFormat date format
* @param isVarcharType whether it is varchar type. A varchar type has no string length limit
Expand All @@ -38,12 +38,11 @@ object FieldConverter {
def objectToString(
value: Any,
serializationNullFormat: String,
delimiterLevel1: String,
delimiterLevel2: String,
complexDelimiters: util.ArrayList[String],
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat,
isVarcharType: Boolean = false,
level: Int = 1): String = {
level: Int = 0): String = {
if (value == null) {
serializationNullFormat
} else {
Expand All @@ -66,30 +65,35 @@ object FieldConverter {
case bs: Array[Byte] => new String(bs,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
case s: scala.collection.Seq[Any] =>
val delimiter = if (level == 1) {
delimiterLevel1
} else {
delimiterLevel2
}
val delimiter = complexDelimiters.get(level)
val builder = new StringBuilder()
s.foreach { x =>
builder.append(objectToString(x, serializationNullFormat, delimiterLevel1,
delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
builder.append(objectToString(x, serializationNullFormat, complexDelimiters,
timeStampFormat, dateFormat, isVarcharType, level + 1))
.append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
// First convert the 'key' of Map and then append the keyValueDelimiter and then convert
// the 'value of the map and append delimiter
case m: scala.collection.Map[_, _] =>
throw new Exception("Unsupported data type: Map")
case r: org.apache.spark.sql.Row =>
val delimiter = if (level == 1) {
delimiterLevel1
} else {
delimiterLevel2
val delimiter = complexDelimiters.get(level)
val keyValueDelimiter = complexDelimiters.get(level + 1)
val builder = new StringBuilder()
m.foreach { x =>
builder.append(objectToString(x._1, serializationNullFormat, complexDelimiters,
timeStampFormat, dateFormat, isVarcharType, level + 2))
.append(keyValueDelimiter)
builder.append(objectToString(x._2, serializationNullFormat, complexDelimiters,
timeStampFormat, dateFormat, isVarcharType, level + 2))
.append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
case r: org.apache.spark.sql.Row =>
val delimiter = complexDelimiters.get(level)
val builder = new StringBuilder()
for (i <- 0 until r.length) {
builder.append(objectToString(r(i), serializationNullFormat, delimiterLevel1,
delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
builder.append(objectToString(r(i), serializationNullFormat, complexDelimiters,
timeStampFormat, dateFormat, isVarcharType, level + 1))
.append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
Expand Down

0 comments on commit 96b2ea3

Please sign in to comment.