Skip to content

Commit

Permalink
update should support limit 1 sub query
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Dec 24, 2019
1 parent 3098fec commit d33f727
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,25 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("""drop table iud.dest11""").show
}

test("update with subquery having limit 1") {
sql("drop table if exists t1")
sql("drop table if exists t2")
sql("create table t1 (age int, name string) stored by 'carbondata'")
sql("insert into t1 select 1, 'aa'")
sql("insert into t1 select 3, 'bb'")
sql("create table t2 (age int, name string) stored by 'carbondata'")
sql("insert into t2 select 3, 'Andy'")
sql("insert into t2 select 2, 'Andy'")
sql("insert into t2 select 1, 'aa'")
sql("insert into t2 select 3, 'aa'")
sql("update t1 set (age) = " +
"(select t2.age from t2 where t2.name = 'Andy' order by age limit 1) " +
"where t1.age = 1 ").show(false)
checkAnswer(sql("select * from t1"), Seq(Row(2,"aa"), Row(3,"bb")))
sql("drop table if exists t1")
sql("drop table if exists t2")
}

test("update carbon table[using destination table columns with where and exist]") {
sql("""drop table if exists iud.dest22""")
sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
import scala.language.implicitConversions

import org.apache.spark.sql.{CarbonToSparkAdapter, DeleteRecords, UpdateTable}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{CarbonToSparkAdapter, Dataset, DeleteRecords, ProjectForUpdate, SparkSession, UpdateTable}
import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.plans.logical._
Expand All @@ -31,7 +32,7 @@ import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnC
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.execution.command.cache.{CarbonDropCacheCommand, CarbonShowCacheCommand}
import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand}
import org.apache.spark.sql.util.CarbonException
Expand Down Expand Up @@ -247,8 +248,61 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
case tab ~ columns ~ rest =>
val (sel, where) = splitQuery(rest)
val selectPattern = """^\s*select\s+""".r
// comma separated string
var constants : String = ""
if (selectPattern.findFirstIn(sel.toLowerCase).isDefined) {
// subQuery starts with select
val mainTableName = tab._4.table
val mainTableAlias = if (tab._3.isDefined) {
tab._3.get
} else {
""
}
val session = SparkSession.getActiveSession.get
val subQueryUnresolvedLogicalPlan = session.sessionState.sqlParser.parsePlan(sel)
var isJoinWithMainTable : Boolean = false
var isLimitPresent : Boolean = false
subQueryUnresolvedLogicalPlan collect {
case f: Filter =>
f.condition.collect {
case attr: UnresolvedAttribute =>
if ((!StringUtils.isEmpty(mainTableAlias) &&
attr.nameParts.head.equalsIgnoreCase(mainTableAlias)) ||
attr.nameParts.head.equalsIgnoreCase(mainTableName)) {
isJoinWithMainTable = true
}
}
case _: GlobalLimit =>
isLimitPresent = true
}
if (isJoinWithMainTable && isLimitPresent) {
throw new UnsupportedOperationException(
"Update subquery has join with limit which leads to multiple join for each limit " +
"rows")
}
if (!isJoinWithMainTable) {
val analyzedPlan = CarbonReflectionUtils.invokeAnalyzerExecute(session
.sessionState
.analyzer, subQueryUnresolvedLogicalPlan)
val subQueryLogicalPlan = session.sessionState.optimizer.execute(analyzedPlan)
// should go as value update, not as join update.
val df = Dataset.ofRows(session, subQueryLogicalPlan)
val rowsCount = df.count()
if (rowsCount == 0L) {
// if length = 0, update to null
constants = "null"
} else if (rowsCount != 1) {
throw new UnsupportedOperationException(
" update cannot be supported for 1 to N mapping, as more than one value present " +
"for the update key")
} else {
constants = "'" + df.collect()(0).toSeq.mkString("','") + "'"
}
}
}
val (selectStmt, relation) =
if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined) {
if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined ||
!StringUtils.isEmpty(constants)) {
if (sel.trim.isEmpty) {
sys.error("At least one source column has to be specified ")
}
Expand All @@ -262,12 +316,16 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
case _ => tab._1
}

val newSel = if (!StringUtils.isEmpty(constants)) {
constants
} else {
sel
}
tab._3 match {
case Some(a) =>
("select " + sel + " from " + getTableName(tab._2) + " " + tab._3.get, relation)
("select " + newSel + " from " + getTableName(tab._2) + " " + tab._3.get, relation)
case None =>
("select " + sel + " from " + getTableName(tab._2), relation)
("select " + newSel + " from " + getTableName(tab._2), relation)
}

} else {
Expand Down

0 comments on commit d33f727

Please sign in to comment.