Skip to content

Commit

Permalink
update should support limit 1 sub query
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Dec 23, 2019
1 parent 3098fec commit 1cf49e8
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,25 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("""drop table iud.dest11""").show
}

test("update with subquery having limit 1") {
sql("drop table if exists t1")
sql("drop table if exists t2")
sql("create table t1 (age int, name string) stored by 'carbondata'")
sql("insert into t1 select 1, 'aa'")
sql("insert into t1 select 3, 'bb'")
sql("create table t2 (age int, name string) stored by 'carbondata'")
sql("insert into t2 select 3, 'Andy'")
sql("insert into t2 select 2, 'Andy'")
sql("insert into t2 select 1, 'aa'")
sql("insert into t2 select 3, 'aa'")
sql("update t1 set (age) = " +
"(select t2.age from t2 where t2.name = 'Andy' order by age limit 1) " +
"where t1.age = 1 ").show(false)
checkAnswer(sql("select * from t1"), Seq(Row(2,"aa"), Row(3,"bb")))
sql("drop table if exists t1")
sql("drop table if exists t2")
}

test("update carbon table[using destination table columns with where and exist]") {
sql("""drop table if exists iud.dest22""")
sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
import scala.language.implicitConversions

import org.apache.spark.sql.{CarbonToSparkAdapter, DeleteRecords, UpdateTable}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{CarbonToSparkAdapter, DeleteRecords, SparkSession, UpdateTable}
import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.plans.logical._
Expand All @@ -32,6 +33,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.command.cache.{CarbonDropCacheCommand, CarbonShowCacheCommand}
import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand}
import org.apache.spark.sql.util.CarbonException
Expand Down Expand Up @@ -247,8 +249,50 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
case tab ~ columns ~ rest =>
val (sel, where) = splitQuery(rest)
val selectPattern = """^\s*select\s+""".r
// comma separated string
var constants : String = ""
if (selectPattern.findFirstIn(sel.toLowerCase).isDefined) {
val df = SparkSession
.getActiveSession
.get
.sql(sel)
var isJoinPresent : Boolean = false
var isLimitPresent : Boolean = false
var isFilterPresent : Boolean = false
val subQueryLogicalPlan = df.queryExecution.optimizedPlan
subQueryLogicalPlan collect {
case _: Join =>
isJoinPresent = true
// optimized plan will have join node if filter has two different relation
case _: Filter =>
isFilterPresent = true
case _: GlobalLimit =>
isLimitPresent = true
}
if (isJoinPresent && isLimitPresent) {
throw new UnsupportedOperationException(
"Update subquery has join with limit which leads to multiple join for each limit " +
"rows")
}
if (!isJoinPresent || !isFilterPresent) {
// should go as value update, not as join update.
// TODO: may be apply having rows > 1 filter before collecting, use df.count()
val rows = df.collect()
// TODO: if length = 0, update to null ??
if (rows.length != 1) {
throw new UnsupportedOperationException(
" update cannot be supported for 1 to N mapping, as more than one value present " +
"for the update key")
}
for (i <- 0 until rows(0).length - 1) {
constants += rows(0).get(i) + ","
}
constants += rows(0).get(rows(0).length - 1)
}
}
val (selectStmt, relation) =
if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined) {
if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined ||
!StringUtils.isEmpty(constants)) {
if (sel.trim.isEmpty) {
sys.error("At least one source column has to be specified ")
}
Expand All @@ -262,12 +306,16 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
case _ => tab._1
}

val newSel = if (!StringUtils.isEmpty(constants)) {
constants
} else {
sel
}
tab._3 match {
case Some(a) =>
("select " + sel + " from " + getTableName(tab._2) + " " + tab._3.get, relation)
("select " + newSel + " from " + getTableName(tab._2) + " " + tab._3.get, relation)
case None =>
("select " + sel + " from " + getTableName(tab._2), relation)
("select " + newSel + " from " + getTableName(tab._2), relation)
}

} else {
Expand Down

0 comments on commit 1cf49e8

Please sign in to comment.