Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36680][SQL] Supports Dynamic Table Options for Spark SQL #41683

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3443,6 +3443,16 @@
"Window function <funcName> requires an OVER clause."
]
},
"WITH_OPTIONS_EXPECTED_SIMPLE_TABLE" : {
"message" : [
"`with_options` function only handles a simple table argument, \"TABLE($t)\"."
]
},
"WITH_OPTIONS_EXPECTED_TABLE" : {
"message" : [
"`with_options` function requires a table argument, \"TABLE($t)\"."
]
},
"WRITE_STREAM_NOT_ALLOWED" : {
"message" : [
"`writeStream` can be called only on streaming Dataset/DataFrame."
Expand Down
12 changes: 12 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2137,6 +2137,18 @@ SQLSTATE: none assigned

Window function `<funcName>` requires an OVER clause.

### WITH_OPTIONS_EXPECTED_SIMPLE_TABLE

SQLSTATE: none assigned

`with_options` function only handles a simple table argument, "TABLE($t)".

### WITH_OPTIONS_EXPECTED_TABLE

SQLSTATE: none assigned

`with_options` function requires a table argument, "TABLE($t)".

### WRITE_STREAM_NOT_ALLOWED

SQLSTATE: none assigned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
ResolveRelationsWithOptions ::
ResolveTableSpec ::
ResolveAliases ::
ResolveSubquery ::
Expand Down Expand Up @@ -3768,6 +3769,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}
}

object ResolveRelationsWithOptions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case RelationWithOptions(child) => child
}
}

/**
* A rule to handle special commands that need to be notified when analysis is done. This rule
* should run after all other analysis rules are run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.xml._
import org.apache.spark.sql.catalyst.plans.logical.{FunctionBuilderBase, Generate, LogicalPlan, OneRowRelation, Range}
import org.apache.spark.sql.catalyst.plans.logical.{FunctionBuilderBase, Generate, LogicalPlan, OneRowRelation, Range, RelationWithOptions}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1066,6 +1066,7 @@ object TableFunctionRegistry {

val logicalPlans: Map[String, (ExpressionInfo, TableFunctionBuilder)] = Map(
logicalPlan[Range]("range"),
logicalPlan[RelationWithOptions]("with_options"),
generatorBuilder("explode", ExplodeGeneratorBuilder),
generatorBuilder("explode_outer", ExplodeOuterGeneratorBuilder),
generator[Inline]("inline"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.plans.logical

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.{AliasIdentifier, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
Expand All @@ -31,8 +33,10 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.collection.Utils
import org.apache.spark.util.random.RandomSampler

Expand Down Expand Up @@ -1136,6 +1140,58 @@ case class Range(
}
}

@ExpressionDescription(
usage = """_FUNC_(table: String, options: Map) -
Returns the data source relation with the given options.
table must be a simple TABLE() parameter.""",
examples = """
Examples:
> SELECT * FROM _FUNC_(TABLE(cat.db.table), map('split-size','5'));
1,a
""",
since = "4.0.0",
group = "table_funcs")
case class RelationWithOptions(child: LogicalPlan)
extends UnaryNode {
override def output: Seq[Attribute] = Nil

def this(table: Expression, options: Expression) = {
this(RelationWithOptions.withOptions(table, options))
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(child = newChild)

override lazy val resolved = expressions.forall(_.resolved) && childrenResolved
}

object RelationWithOptions {
def withOptions(tableExpr: Expression, options: Expression):
LogicalPlan = {
val relationOptions = ExprUtils.convertToMapData(options)

if (!tableExpr.isInstanceOf[FunctionTableSubqueryArgumentExpression]) {
throw QueryCompilationErrors.withOptionsExpectedTableError()
}
val table = tableExpr.asInstanceOf[FunctionTableSubqueryArgumentExpression]

table.plan match {
// Support only a direct call to Table(t1)
// Support only DataSourceV2Relation as its the only relation with options
case t @ SubqueryAlias(_, r @ DataSourceV2Relation(_, _, _, _, options))
=> t.copy(child = r.copy(options = merge(options, relationOptions)))
case _ => throw QueryCompilationErrors.withOptionsExpectedSimpleTableError()
}
}

def merge(original: CaseInsensitiveStringMap, newMap: Map[String, String]):
CaseInsensitiveStringMap = {
val map = new java.util.HashMap[String, String](original)
map.putAll(newMap.asJava)
new CaseInsensitiveStringMap(map)
}
}

/**
* This is a Group by operator with the aggregate functions and projections.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,20 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
)
}

def withOptionsExpectedSimpleTableError(): Throwable = {
new AnalysisException(
errorClass = "WITH_OPTIONS_EXPECTED_SIMPLE_TABLE",
messageParameters = Map.empty
)
}

def withOptionsExpectedTableError(): Throwable = {
new AnalysisException(
errorClass = "WITH_OPTIONS_EXPECTED_TABLE",
messageParameters = Map.empty
)
}

def identifierTooManyNamePartsError(originalIdentifier: String): Throwable = {
new AnalysisException(
errorClass = "IDENTIFIER_TOO_MANY_NAME_PARTS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3295,6 +3295,54 @@ class DataSourceV2SQLSuiteV1Filter
}
}

test("SPARK-36680: Supports Dynamic Table Options for Spark SQL") {
val t1 = s"${catalogAndNamespace}table"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b')")

val df = sql(s"SELECT * FROM with_options(TABLE ($t1), map('split-size', '5'))")
val collected = df.queryExecution.optimizedPlan.collect {
case scan: DataSourceV2ScanRelation =>
assert(scan.relation.options.get("split-size") == "5")
}
assert (collected.size == 1)
checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
}

// negative tests
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")

val notEnoughArgs = intercept[AnalysisException](sql(s"SELECT * FROM with_options('$t1')"))
assert(notEnoughArgs.message.contains(
"The `with_options` requires 2 parameters but the actual number is 1"))

val wrongFirstArg = intercept[AnalysisException](
sql(s"SELECT * FROM with_options(foo, map('split-size','5'))"))
assert(wrongFirstArg.message.contains(
"A column, variable, or function parameter with name `foo` cannot be resolved"
))

val wrongFirstArg2 = intercept[AnalysisException](
sql(s"SELECT * FROM with_options(array('$t1'), map('split-size','5'))"))
assert(wrongFirstArg2.message.contains(
"`with_options` function requires a table argument, \"TABLE($t)\""
))

val wrongSecondArg = intercept[AnalysisException](
sql(s"SELECT * FROM with_options(TABLE ($t1), array('split-size', '5'))"))
assert(wrongSecondArg.message.contains(
"Must use the `map()` function for options"))

val unsupportedTableCall = intercept[AnalysisException](
sql(s"SELECT * FROM with_options(TABLE (SELECT * FROM range(0,1))" +
s", map('split-size','5'))"))
assert(unsupportedTableCall.message.contains(
"`with_options` function only handles a simple table argument, \"TABLE($t)\""))
}
}

private def testNotSupportedV2Command(
sqlCommand: String,
sqlParams: String,
Expand Down