Skip to content
Permalink
Browse files
[CARBONDATA-4210] Handle 3.1 parsing failures related to alter comple…
…x types

Why is this PR needed?
For 2.3 and 2.4 parsing of alter commands are done by spark. Which is not in the case of 3.1.

What changes were proposed in this PR?
So carbon is responsible for the parsing here.
Previously ignored test cases due to this issue are now enabled.

This closes #4162
  • Loading branch information
akkio-97 authored and kunal642 committed Jul 14, 2021
1 parent 88fdf60 commit 02e77234ebb9c1e18e194c1844aaf2d4a2473dcc
Showing 6 changed files with 390 additions and 367 deletions.
@@ -46,158 +46,142 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
}

test("Test restructured array<string> and existing string column as index columns on SI with compaction") {
// TODO: Support alter chnage column for complex type for SPARK 3.1.1
// REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
if (!sqlContext.sparkContext.version.startsWith("3.1")) {
sql("drop table if exists complextable")
sql("create table complextable (id string, country array<string>, columnName string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
sql("insert into complextable select 2,array('pak'), 'v'")
sql("drop table if exists complextable")
sql("create table complextable (id string, country array<string>, columnName string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
sql("insert into complextable select 2,array('pak'), 'v'")

sql("drop index if exists index_11 on complextable")
sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
sql("alter table complextable change newArray arr2 array<string>")
sql("alter table complextable change columnName name string")
sql("insert into complextable select 3,array('china'), 'f',array('hello','world')")
sql("insert into complextable select 4,array('India'),'g',array('iron','man','jarvis')")
sql("drop index if exists index_11 on complextable")
sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
sql("alter table complextable change newArray arr2 array<string>")
sql("alter table complextable change columnName name string")
sql("insert into complextable select 3,array('china'), 'f',array('hello','world')")
sql("insert into complextable select 4,array('India'),'g',array('iron','man','jarvis')")

checkAnswer(sql("select * from complextable where array_contains(arr2,'iron')"),
Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
mutable.WrappedArray.make(Array("iron", "man", "jarvis")))))
val result1 = sql("select * from complextable where array_contains(arr2,'iron') and name='g'")
val result2 = sql("select * from complextable where arr2[0]='iron' and name='f'")
sql("create index index_11 on table complextable(arr2, name) as 'carbondata'")
sql("alter table complextable compact 'minor'")
val df1 = sql(" select * from complextable where array_contains(arr2,'iron') and name='g'")
val df2 = sql(" select * from complextable where arr2[0]='iron' and name='f'")
if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
val doNotHitSIDf = sql(" select * from complextable where array_contains(arr2,'iron') and array_contains(arr2,'man')")
if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
checkAnswer(result1, df1)
checkAnswer(result2, df2)
checkAnswer(sql("select * from complextable where array_contains(arr2,'iron')"),
Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
mutable.WrappedArray.make(Array("iron", "man", "jarvis")))))
val result1 = sql("select * from complextable where array_contains(arr2,'iron') and name='g'")
val result2 = sql("select * from complextable where arr2[0]='iron' and name='f'")
sql("create index index_11 on table complextable(arr2, name) as 'carbondata'")
sql("alter table complextable compact 'minor'")
val df1 = sql(" select * from complextable where array_contains(arr2,'iron') and name='g'")
val df2 = sql(" select * from complextable where arr2[0]='iron' and name='f'")
if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
val doNotHitSIDf = sql(" select * from complextable where array_contains(arr2,'iron') and array_contains(arr2,'man')")
if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
checkAnswer(result1, df1)
checkAnswer(result2, df2)
}

test("Test restructured array<string> and string columns as index columns on SI with compaction") {
// TODO: Support alter chnage column for complex type for SPARK 3.1.1
// REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
if (!sqlContext.sparkContext.version.startsWith("3.1")) {
sql("drop table if exists complextable")
sql("create table complextable (id string, country array<string>, name string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
sql("insert into complextable select 2,array('pak'), 'v'")
sql("drop table if exists complextable")
sql("create table complextable (id string, country array<string>, name string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
sql("insert into complextable select 2,array('pak'), 'v'")

sql("drop index if exists index_11 on complextable")
sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
sql("alter table complextable change newArray arr2 array<string>")
sql("ALTER TABLE complextable ADD COLUMNS(address string)")
sql("alter table complextable change address addr string")
sql("insert into complextable select 3,array('china'), 'f',array('hello','world'),'china'")
sql("insert into complextable select 4,array('India'),'g',array('iron','man','jarvis'),'India'")
sql("drop index if exists index_11 on complextable")
sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
sql("alter table complextable change newArray arr2 array<string>")
sql("ALTER TABLE complextable ADD COLUMNS(address string)")
sql("alter table complextable change address addr string")
sql("insert into complextable select 3,array('china'), 'f',array('hello','world'),'china'")
sql("insert into complextable select 4,array('India'),'g',array('iron','man','jarvis'),'India'")

checkAnswer(sql("select * from complextable where array_contains(arr2,'iron')"),
Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
mutable.WrappedArray.make(Array("iron", "man", "jarvis")), "India")))
val result1 = sql("select * from complextable where array_contains(arr2,'iron') and addr='India'")
val result2 = sql("select * from complextable where arr2[0]='iron' and addr='china'")
sql("create index index_11 on table complextable(arr2, addr) as 'carbondata'")
sql("alter table complextable compact 'minor'")
val df1 = sql(" select * from complextable where array_contains(arr2,'iron') and addr='India'")
val df2 = sql(" select * from complextable where arr2[0]='iron' and addr='china'")
if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
val doNotHitSIDf = sql(" select * from complextable where array_contains(arr2,'iron') and array_contains(arr2,'man')")
if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
checkAnswer(result1, df1)
checkAnswer(result2, df2)
checkAnswer(sql("select * from complextable where array_contains(arr2,'iron')"),
Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
mutable.WrappedArray.make(Array("iron", "man", "jarvis")), "India")))
val result1 = sql("select * from complextable where array_contains(arr2,'iron') and addr='India'")
val result2 = sql("select * from complextable where arr2[0]='iron' and addr='china'")
sql("create index index_11 on table complextable(arr2, addr) as 'carbondata'")
sql("alter table complextable compact 'minor'")
val df1 = sql(" select * from complextable where array_contains(arr2,'iron') and addr='India'")
val df2 = sql(" select * from complextable where arr2[0]='iron' and addr='china'")
if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
val doNotHitSIDf = sql(" select * from complextable where array_contains(arr2,'iron') and array_contains(arr2,'man')")
if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
checkAnswer(result1, df1)
checkAnswer(result2, df2)
}

test("test array<string> on secondary index with compaction") {
// TODO: Support alter chnage column for complex type for SPARK 3.1.1
// REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
if (!sqlContext.sparkContext.version.startsWith("3.1")) {
sql("create table complextable (id string, columnCountry array<string>, name string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
sql("insert into complextable select 2,array('pak'), 'v'")
sql("insert into complextable select 3,array('china'), 'f'")
sql("insert into complextable select 4,array('india'),'g'")
sql("alter table complextable change columnCountry country array<string>")
val result1 = sql(" select * from complextable where array_contains(country,'china')")
val result2 = sql(" select * from complextable where country[0]='china'")
sql("drop index if exists index_1 on complextable")
sql("create index index_1 on table complextable(country) as 'carbondata'")
sql("alter table complextable compact 'minor'")
val df1 = sql(" select * from complextable where array_contains(country,'china')")
val df2 = sql(" select * from complextable where country[0]='china'")
if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
val doNotHitSIDf = sql(" select * from complextable where array_contains(country,'china') and array_contains(country,'us')")
if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
checkAnswer(result1, df1)
checkAnswer(result2, df2)
sql("create table complextable (id string, columnCountry array<string>, name string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
sql("insert into complextable select 2,array('pak'), 'v'")
sql("insert into complextable select 3,array('china'), 'f'")
sql("insert into complextable select 4,array('india'),'g'")
sql("alter table complextable change columnCountry country array<string>")
val result1 = sql(" select * from complextable where array_contains(country,'china')")
val result2 = sql(" select * from complextable where country[0]='china'")
sql("drop index if exists index_1 on complextable")
sql("create index index_1 on table complextable(country) as 'carbondata'")
sql("alter table complextable compact 'minor'")
val df1 = sql(" select * from complextable where array_contains(country,'china')")
val df2 = sql(" select * from complextable where country[0]='china'")
if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
val doNotHitSIDf = sql(" select * from complextable where array_contains(country,'china') and array_contains(country,'us')")
if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
checkAnswer(result1, df1)
checkAnswer(result2, df2)
}

test("test array<string> and string as index columns on secondary index with compaction") {
// TODO: Support alter chnage column for complex type for SPARK 3.1.1
// REFER: https://issues.apache.org/jira/browse/CARBONDATA-4210
if (!sqlContext.sparkContext.version.startsWith("3.1")) {
sql("create table complextable (id string, columnCountry array<string>, name string) stored as carbondata")
sql("insert into complextable select 1, array('china', 'us'), 'b'")
sql("insert into complextable select 2, array('pak'), 'v'")
sql("insert into complextable select 3, array('china'), 'f'")
sql("insert into complextable select 4, array('india'),'g'")
sql("alter table complextable change columnCountry country array<string>")
val result = sql(" select * from complextable where array_contains(country,'china') and name='f'")
sql("drop index if exists index_1 on complextable")
sql("create index index_1 on table complextable(country, name) as 'carbondata'")
sql("alter table complextable compact 'minor'")
val df = sql(" select * from complextable where array_contains(country,'china') and name='f'")
if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
checkAnswer(result, df)
sql("create table complextable (id string, columnCountry array<string>, name string) stored as carbondata")
sql("insert into complextable select 1, array('china', 'us'), 'b'")
sql("insert into complextable select 2, array('pak'), 'v'")
sql("insert into complextable select 3, array('china'), 'f'")
sql("insert into complextable select 4, array('india'),'g'")
sql("alter table complextable change columnCountry country array<string>")
val result = sql(" select * from complextable where array_contains(country,'china') and name='f'")
sql("drop index if exists index_1 on complextable")
sql("create index index_1 on table complextable(country, name) as 'carbondata'")
sql("alter table complextable compact 'minor'")
val df = sql(" select * from complextable where array_contains(country,'china') and name='f'")
if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
assert(false)
} else {
assert(true)
}
checkAnswer(result, df)
}

test("test load data with array<string> on secondary index") {
@@ -1110,6 +1110,53 @@ object CarbonParserUtil {
}
}

/**
* This method will parse the given data type and validate against the allowed data types
*
* @param complexField datatype structure(only complex) given by the user in DDL
* @param values values defined when the decimal datatype is given in DDL
* @return DataTypeInfo object with datatype, precision and scale
*/
def parseDataType(
columnName: String,
complexField: Field,
values: Option[List[(Int, Int)]]): DataTypeInfo = {
val dataTypeName = DataTypeConverterUtil.convertToCarbonType(complexField).getName
val dataTypeInfo = CarbonParserUtil.parseDataType(columnName, dataTypeName.toLowerCase, values)
complexField.dataType match {
case Some(CarbonCommonConstants.ARRAY) =>
val childField = complexField.children.get(0)
val childType = childField.dataType
val childName = columnName + CarbonCommonConstants.POINT + childField.name
val childValues = childType match {
case d: DecimalType => Some(List((d.precision, d.scale)))
case _ => None
}
val childDatatypeInfo = parseDataType(childName, childField, childValues)
dataTypeInfo.setChildren(List(childDatatypeInfo))
case Some(CarbonCommonConstants.STRUCT) =>
var childTypeInfoList: List[DataTypeInfo] = null
for (childField <- complexField.children.get) {
val childType = childField.dataType
val childName = columnName + CarbonCommonConstants.POINT + childField.name.get
val childValues = childType match {
case d: DecimalType => Some(List((d.precision, d.scale)))
case _ => None
}
val childDatatypeInfo = parseDataType(childName, childField, childValues)
if (childTypeInfoList == null) {
childTypeInfoList = List(childDatatypeInfo)
} else {
childTypeInfoList = childTypeInfoList :+ childDatatypeInfo
}
}
dataTypeInfo.setChildren(childTypeInfoList)
case _ =>
}
// TODO have to handle for map types [CARBONDATA-4199]
dataTypeInfo
}

/**
* This method will parse the given data type and validate against the allowed data types
*
@@ -17,6 +17,8 @@

package org.apache.spark.sql.parser

import java.util.regex.Pattern

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
@@ -629,14 +631,32 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
commandOptions)
}


protected lazy val alterTableColumnRenameAndModifyDataType: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (CHANGE ~> ident) ~ ident ~ ident ~
opt("(" ~> rep1sep(valueOptions, ",") <~ ")") ~ opt(COMMENT ~> restInput) <~ opt(";") ^^ {
case dbName ~ table ~ columnName ~ columnNameCopy ~ dataType ~ values ~
comment if CarbonPlanHelper.isCarbonTable(TableIdentifier(table, dbName)) =>
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (CHANGE ~> ident) ~ ident ~
opt(primitiveTypes) ~ opt(nestedType) ~ opt(COMMENT ~> restInput) <~ opt(";") ^^ {
case dbName ~ table ~ columnName ~ newColumnName ~ dataType ~ complexField ~
comment if CarbonPlanHelper.isCarbonTable(TableIdentifier(table, dbName)) &&
(complexField.isDefined ^ dataType.isDefined) =>
var primitiveType = dataType
var newComment: Option[String] = comment
var decimalValues = None: Option[List[(Int, Int)]]
// if datatype is decimal then extract precision and scale
if (!dataType.equals(None) && dataType.get.contains(CarbonCommonConstants.DECIMAL)) {
val matcher = Pattern.compile("[0-9]+").matcher(dataType.get)
val list = collection.mutable.ListBuffer.empty[Int]
while ( { matcher.find }) {
list += matcher.group.toInt
}
decimalValues = Some(List((list(0), list(1))))
primitiveType = Some(CarbonCommonConstants.DECIMAL)
}
newComment = if (comment.isDefined) {
Some(StringUtils.substringBetween(comment.get, "'", "'"))
} else { None }

CarbonSparkSqlParserUtil.alterTableColumnRenameAndModifyDataType(
dbName, table, columnName, columnNameCopy, dataType, values, comment)
dbName, table, columnName, newColumnName, primitiveType, decimalValues, newComment,
complexField)
}

protected lazy val alterTableAddColumns: Parser[LogicalPlan] =

0 comments on commit 02e7723

Please sign in to comment.