Skip to content

Commit

Permalink
Merge fc5073f into 0e467eb
Browse files Browse the repository at this point in the history
  • Loading branch information
manishnalla1994 committed Dec 17, 2018
2 parents 0e467eb + fc5073f commit 72087bc
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 47 deletions.
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk

import scala.collection.JavaConversions._

class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
Expand Down Expand Up @@ -442,4 +441,45 @@ class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
"sort_columns is unsupported for map datatype column: mapfield"))
}

test("Data Load Fail Issue") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<INT,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
sql(
s"""
| LOAD DATA LOCAL INPATH '$path'
| INTO TABLE carbon OPTIONS(
| 'header' = 'false')
""".stripMargin)
sql("INSERT INTO carbon SELECT * FROM carbon")
checkAnswer(sql("select * from carbon"), Seq(
Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")),
Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar"))
))
}

test("Struct inside map") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<INT,struct<kk:STRING,mm:STRING>>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
sql("INSERT INTO carbon values('1\002man\003nan\0012\002kands\003dsnknd')")
sql("SELECT * FROM carbon").show(false)
sql("INSERT INTO carbon SELECT * FROM carbon")
sql("SELECT * FROM carbon limit 2").show(false)
}

}
Expand Up @@ -20,6 +20,7 @@ package org.apache.carbondata.spark.rdd
import java.io.{DataInputStream, InputStreamReader}
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util
import java.util.regex.Pattern

import scala.collection.mutable
Expand Down Expand Up @@ -293,11 +294,12 @@ class CarbonBlockDistinctValuesCombineRDD(
row = rddIter.next()
if (row != null) {
rowCount += 1
val complexDelimiters = new util.ArrayList[String]
model.delimiters.foreach(x => complexDelimiters.add(x))
for (i <- 0 until dimNum) {
dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
model.serializationNullFormat,
model.delimiters(0),
model.delimiters(1),
complexDelimiters,
timeStampFormat,
dateFormat))
}
Expand Down
Expand Up @@ -339,9 +339,7 @@ class NewRddIterator(rddIter: Iterator[Row],
private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
private val dateFormat = new SimpleDateFormat(dateFormatString)
private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
private val complexDelimiters = carbonLoadModel.getComplexDelimiters
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
import scala.collection.JavaConverters._
Expand All @@ -355,7 +353,7 @@ class NewRddIterator(rddIter: Iterator[Row],
val columns = new Array[AnyRef](row.length)
for (i <- 0 until columns.length) {
columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
complexDelimiters, timeStampFormat, dateFormat,
isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
}
columns
Expand Down Expand Up @@ -389,9 +387,7 @@ class LazyRddIterator(serializer: SerializerInstance,
.getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
private val dateFormat = new SimpleDateFormat(dateFormatString)
private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
private val complexDelimiters = carbonLoadModel.getComplexDelimiters
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
// the order of fields in dataframe and createTable may be different, here we need to know whether
Expand Down Expand Up @@ -429,7 +425,7 @@ class LazyRddIterator(serializer: SerializerInstance,
val columns = new Array[AnyRef](row.length)
for (i <- 0 until columns.length) {
columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
complexDelimiters, timeStampFormat, dateFormat,
isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
}
columns
Expand Down
Expand Up @@ -62,14 +62,19 @@ object CarbonScalaUtil {

def getString(value: Any,
serializationNullFormat: String,
delimiterLevel1: String,
delimiterLevel2: String,
complexDelimiters: util.ArrayList[String],
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat,
isVarcharType: Boolean = false,
level: Int = 1): String = {
FieldConverter.objectToString(value, serializationNullFormat, delimiterLevel1,
delimiterLevel2, timeStampFormat, dateFormat, isVarcharType = isVarcharType, level)
level: Int = 0): String = {
FieldConverter
.objectToString(value,
serializationNullFormat,
complexDelimiters,
timeStampFormat,
dateFormat,
isVarcharType = isVarcharType,
level)
}

/**
Expand Down
Expand Up @@ -989,15 +989,14 @@ object CarbonDataRDDFactory {
// generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
// input data from DataFrame
val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
val complexDelimiters = carbonLoadModel.getComplexDelimiters
val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
dataFrame.get.rdd.map { row =>
if (null != row && row.length > partitionColumnIndex &&
null != row.get(partitionColumnIndex)) {
(CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
complexDelimiters, timeStampFormat, dateFormat), row)
} else {
(null, row)
}
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.apache.carbondata.streaming.parser

import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util

import org.apache.carbondata.core.constants.CarbonCommonConstants

Expand All @@ -28,8 +29,7 @@ object FieldConverter {
* Return a String representation of the input value
* @param value input value
* @param serializationNullFormat string for null value
* @param delimiterLevel1 level 1 delimiter for complex type
* @param delimiterLevel2 level 2 delimiter for complex type
* @param complexDelimiters List of Complex Delimiters
* @param timeStampFormat timestamp format
* @param dateFormat date format
* @param isVarcharType whether it is varchar type. A varchar type has no string length limit
Expand All @@ -38,12 +38,11 @@ object FieldConverter {
def objectToString(
value: Any,
serializationNullFormat: String,
delimiterLevel1: String,
delimiterLevel2: String,
complexDelimiters: util.ArrayList[String],
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat,
isVarcharType: Boolean = false,
level: Int = 1): String = {
level: Int = 0): String = {
if (value == null) {
serializationNullFormat
} else {
Expand All @@ -66,30 +65,57 @@ object FieldConverter {
case bs: Array[Byte] => new String(bs,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
case s: scala.collection.Seq[Any] =>
val delimiter = if (level == 1) {
delimiterLevel1
} else {
delimiterLevel2
}
val delimiter = complexDelimiters.get((level))
val builder = new StringBuilder()
s.foreach { x =>
builder.append(objectToString(x, serializationNullFormat, delimiterLevel1,
delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
builder
.append(objectToString(x,
serializationNullFormat,
complexDelimiters,
timeStampFormat,
dateFormat,
isVarcharType,
level + 1))
.append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
case m: scala.collection.Map[_, _] =>
throw new Exception("Unsupported data type: Map")
case r: org.apache.spark.sql.Row =>
val delimiter = if (level == 1) {
delimiterLevel1
} else {
delimiterLevel2
val delimiter = complexDelimiters.get(level)
val keyValueDelimiter = complexDelimiters.get(level + 1)
val builder = new StringBuilder()
m.foreach { x =>
builder
.append(objectToString(x._1,
serializationNullFormat,
complexDelimiters,
timeStampFormat,
dateFormat,
isVarcharType,
level + 2))
.append(keyValueDelimiter)
builder
.append(objectToString(x._2,
serializationNullFormat,
complexDelimiters,
timeStampFormat,
dateFormat,
isVarcharType,
level + 2))
.append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
case r: org.apache.spark.sql.Row =>
val delimiter = complexDelimiters.get(level)
val builder = new StringBuilder()
for (i <- 0 until r.length) {
builder.append(objectToString(r(i), serializationNullFormat, delimiterLevel1,
delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
builder
.append(objectToString(r(i),
serializationNullFormat,
complexDelimiters,
timeStampFormat,
dateFormat,
isVarcharType,
level + 1))
.append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.carbondata.streaming.parser

import java.text.SimpleDateFormat
import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
Expand All @@ -26,6 +27,8 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.processing.loading
import org.apache.carbondata.processing.loading.ComplexDelimitersEnum
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants

/**
Expand All @@ -39,9 +42,7 @@ class RowStreamParserImp extends CarbonStreamParser {

var timeStampFormat: SimpleDateFormat = null
var dateFormat: SimpleDateFormat = null
var complexDelimiterLevel1: String = null
var complexDelimiterLevel2: String = null
var complexDelimiterLevel3: String = null
var complexDelimiters: util.ArrayList[String] = new util.ArrayList[String]()
var serializationNullFormat: String = null

override def initialize(configuration: Configuration, structType: StructType): Unit = {
Expand All @@ -53,19 +54,21 @@ class RowStreamParserImp extends CarbonStreamParser {
this.configuration.get(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT))
this.dateFormat = new SimpleDateFormat(
this.configuration.get(CarbonCommonConstants.CARBON_DATE_FORMAT))
this.complexDelimiterLevel1 = this.configuration.get("carbon_complex_delimiter_level_1")
this.complexDelimiterLevel2 = this.configuration.get("carbon_complex_delimiter_level_2")
this.complexDelimiterLevel3 = this.configuration.get("carbon_complex_delimiter_level_3")
this.complexDelimiters.add(this.configuration.get("carbon_complex_delimiter_level_1"))
this.complexDelimiters.add(this.configuration.get("carbon_complex_delimiter_level_2"))
this.complexDelimiters.add(this.configuration.get("carbon_complex_delimiter_level_3"))
this.complexDelimiters.add(ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value())
this.serializationNullFormat =
this.configuration.get(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
}

override def parserRow(value: InternalRow): Array[Object] = {
this.encoder.fromRow(value).toSeq.map { x => {
FieldConverter.objectToString(
x, serializationNullFormat, complexDelimiterLevel1, complexDelimiterLevel2,
x, serializationNullFormat, complexDelimiters,
timeStampFormat, dateFormat)
} }.toArray
}
}.toArray
}

override def close(): Unit = {
Expand Down

0 comments on commit 72087bc

Please sign in to comment.