Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39607][SQL][DSV2] Distribution and ordering support V2 function in writing #36995

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.catalyst.analysis

import java.lang.reflect.{Method, Modifier}
import java.util
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean
Expand Down Expand Up @@ -47,8 +46,7 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{After, ColumnPosition}
import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME
import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
Expand Down Expand Up @@ -2336,33 +2334,7 @@ class Analyzer(override val catalogManager: CatalogManager)
throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
scalarFunc.name(), "IGNORE NULLS")
} else {
val declaredInputTypes = scalarFunc.inputTypes().toSeq
val argClasses = declaredInputTypes.map(ScalaReflection.dataTypeJavaClass)
findMethod(scalarFunc, MAGIC_METHOD_NAME, argClasses) match {
case Some(m) if Modifier.isStatic(m.getModifiers) =>
StaticInvoke(scalarFunc.getClass, scalarFunc.resultType(),
MAGIC_METHOD_NAME, arguments, inputTypes = declaredInputTypes,
propagateNull = false, returnNullable = scalarFunc.isResultNullable,
isDeterministic = scalarFunc.isDeterministic)
case Some(_) =>
val caller = Literal.create(scalarFunc, ObjectType(scalarFunc.getClass))
Invoke(caller, MAGIC_METHOD_NAME, scalarFunc.resultType(),
arguments, methodInputTypes = declaredInputTypes, propagateNull = false,
returnNullable = scalarFunc.isResultNullable,
isDeterministic = scalarFunc.isDeterministic)
case _ =>
// TODO: handle functions defined in Scala too - in Scala, even if a
// subclass do not override the default method in parent interface
// defined in Java, the method can still be found from
// `getDeclaredMethod`.
findMethod(scalarFunc, "produceResult", Seq(classOf[InternalRow])) match {
case Some(_) =>
ApplyFunctionExpression(scalarFunc, arguments)
case _ =>
failAnalysis(s"ScalarFunction '${scalarFunc.name()}' neither implement" +
s" magic method nor override 'produceResult'")
}
}
V2ExpressionUtils.resolveScalarFunction(scalarFunc, arguments)
}
}

Expand All @@ -2377,23 +2349,6 @@ class Analyzer(override val catalogManager: CatalogManager)
val aggregator = V2Aggregator(aggFunc, arguments)
aggregator.toAggregateExpression(u.isDistinct, u.filter)
}

/**
* Check if the input `fn` implements the given `methodName` with parameter types specified
* via `argClasses`.
*/
private def findMethod(
fn: BoundFunction,
methodName: String,
argClasses: Seq[Class[_]]): Option[Method] = {
val cls = fn.getClass
try {
Some(cls.getDeclaredMethod(methodName, argClasses: _*))
} catch {
case _: NoSuchMethodException =>
None
}
}
}

/**
Expand Down
Expand Up @@ -17,13 +17,17 @@

package org.apache.spark.sql.catalyst.expressions

import java.lang.reflect.{Method, Modifier}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier}
import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME
import org.apache.spark.sql.connector.expressions.{BucketTransform, Expression => V2Expression, FieldReference, IdentityTransform, NamedReference, NamedTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder, SortValue, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -52,8 +56,11 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
/**
* Converts the array of input V2 [[V2SortOrder]] into their counterparts in catalyst.
*/
def toCatalystOrdering(ordering: Array[V2SortOrder], query: LogicalPlan): Seq[SortOrder] = {
ordering.map(toCatalyst(_, query).asInstanceOf[SortOrder])
def toCatalystOrdering(
ordering: Array[V2SortOrder],
query: LogicalPlan,
funCatalogOpt: Option[FunctionCatalog] = None): Seq[SortOrder] = {
ordering.map(toCatalyst(_, query, funCatalogOpt).asInstanceOf[SortOrder])
}

def toCatalyst(
Expand Down Expand Up @@ -143,4 +150,53 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
case V2NullOrdering.NULLS_FIRST => NullsFirst
case V2NullOrdering.NULLS_LAST => NullsLast
}

def resolveScalarFunction(
scalarFunc: ScalarFunction[_],
arguments: Seq[Expression]): Expression = {
val declaredInputTypes = scalarFunc.inputTypes().toSeq
val argClasses = declaredInputTypes.map(ScalaReflection.dataTypeJavaClass)
findMethod(scalarFunc, MAGIC_METHOD_NAME, argClasses) match {
case Some(m) if Modifier.isStatic(m.getModifiers) =>
StaticInvoke(scalarFunc.getClass, scalarFunc.resultType(),
MAGIC_METHOD_NAME, arguments, inputTypes = declaredInputTypes,
propagateNull = false, returnNullable = scalarFunc.isResultNullable,
isDeterministic = scalarFunc.isDeterministic)
case Some(_) =>
val caller = Literal.create(scalarFunc, ObjectType(scalarFunc.getClass))
Invoke(caller, MAGIC_METHOD_NAME, scalarFunc.resultType(),
arguments, methodInputTypes = declaredInputTypes, propagateNull = false,
returnNullable = scalarFunc.isResultNullable,
isDeterministic = scalarFunc.isDeterministic)
case _ =>
// TODO: handle functions defined in Scala too - in Scala, even if a
// subclass do not override the default method in parent interface
// defined in Java, the method can still be found from
// `getDeclaredMethod`.
findMethod(scalarFunc, "produceResult", Seq(classOf[InternalRow])) match {
case Some(_) =>
ApplyFunctionExpression(scalarFunc, arguments)
case _ =>
throw new AnalysisException(s"ScalarFunction '${scalarFunc.name()}'" +
s" neither implement magic method nor override 'produceResult'")
}
}
}

/**
* Check if the input `fn` implements the given `methodName` with parameter types specified
* via `argClasses`.
*/
private def findMethod(
fn: BoundFunction,
methodName: String,
argClasses: Seq[Class[_]]): Option[Method] = {
val cls = fn.getClass
try {
Some(cls.getDeclaredMethod(methodName, argClasses: _*))
} catch {
case _: NoSuchMethodException =>
None
}
}
}
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -48,6 +48,10 @@ case class DataSourceV2Relation(

import DataSourceV2Implicits._

lazy val funCatalog: Option[FunctionCatalog] = catalog.collect {
case c: FunctionCatalog => c
}

override lazy val metadataOutput: Seq[AttributeReference] = table match {
case hasMeta: SupportsMetadataColumns =>
val resolve = conf.resolver
Expand Down
Expand Up @@ -17,22 +17,33 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, SortOrder, TransformExpression, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartitions, RepartitionByExpression, Sort}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.FunctionCatalog
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction
import org.apache.spark.sql.connector.distributions._
import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write}
import org.apache.spark.sql.errors.QueryCompilationErrors

object DistributionAndOrderingUtils {

def prepareQuery(write: Write, query: LogicalPlan): LogicalPlan = write match {
def prepareQuery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I wonder how does the write work with transforms such as bucket. For example, suppose the required distribution is bucket(col, 100), Spark currently will compute the partition (bucket) ID by murmur_hash(bucket(col, 100)) pmod 100, so the value of col is essentially hashed twice. I'm not sure whether this breaks any assumption from the V2 data source side, or whether it has any effect in the hash key distributions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark currently will compute the partition (bucket) ID by murmur_hash(bucket(col, 100)) pmod 100

It's only happening on the V1 write path. In V2, the bucket is resolved as BucketTransform in analyzing phase, and converted to evaluable catalyst expression ApplyFunctionExpression/Invoke/StaticInvoke here, so I don't think your concern will happen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes it will be converted to ApplyFunctionExpression etc. But in V2 write path, these partition transforms will be used in RepartitionByExpression, and then converted into ShuffleExchangeExec during physical planning, right?

Checking the code path, the ApplyFunctionExpression / Invoke / StaticInvoke, etc, will be passed to RepartitionByExpression first, as field partitionExpressions, and then be passed to HashPartitioning, RangePartitioning etc, in HasPartitionExpressions, and eventually be used in places like HashPartitioning.partitionIdExpression where the computation of partition ID I mentioned above happened.

Copy link
Member Author

@pan3793 pan3793 Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining, educated. After reading the code, I think you are right that "the value of col is essentially hashed twice", but I don't think it will bring correctness issues, because it still guarantees that the same values will be clustered into the same partition.
One example is Hive bucket. In V1WritesUtils#getWriterBucketSpec, both HiveHash and HashPartitioning#partitionIdExpression can be used to construct bucketIdExpression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just thinking whether we should enforce stronger semantics on the bucketing. Consider we build bucketed table support for DSv2 file source on top on this mechanism, does it mean for a particular row, it could be hashed to a different bucket ID than V1? What if someone wants to first write a bucketed table using V2 and then read back in V1, and perhaps do bucket join with another V1 bucketed table? I wonder if that could cause incorrect results.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that different storage system usually defines their own hash functions and sharding(bucketing) rules, I don't think we can make bucket tables of all data sources fully compatible w/ each other, e.g. Spark, Hive, Iceberg using the total different hash algorithm for bucketing.

We can make that DS V1 file bucket table compatible w/ V2 file bucket table, and Hive bucket table compatible w/ V2 Hive bucket table, but DS file bucket table can not be compatible w/ Hive(neither V1 nor V2) since they use the different hash algorithm.

In detail, as V2SessionCatalog extends FunctionCatalog, to make V2 file bucket tables compatible w/ V1, we can introduce and register a SparkBucket V2 function using the same hash algorithm V1 in V2SessionCatalog. For Hive tables, just register a similar HiveBucket V2 function using the Hive hash algorithm in HiveCatalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @sunchao brings a valid point that is easy to overlook. We have to make sure Spark writes to Hive tables in the same way no matter whether the v1 or v2 path is being used.

Would it be correct to say we have this issue because partitionIdExpression in HashPartitioning is used both for generating bucket IDs in Hive tables as well as for producing partition IDs for writing tasks? Can we use different mechanisms?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After SPARK-32709(#33432), it allows using different hash expressions for generating partition IDs and bucket IDs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider we build bucketed table support for DSv2 file source on top on this mechanism, does it mean for a particular row, it could be hashed to a different bucket ID than V1? What if someone wants to first write a bucketed table using V2 and then read back in V1, and perhaps do bucket join with another V1 bucketed table?

@sunchao, coming back to the use case you mentioned above. I think the bucket ID will be always the same as long as the task writer respects the table bucket spec and Spark shuffles all records that are supposed to land in one bucket to one task. Like @pan3793 said, I suppose V2 file source will request a distribution using a V2 function that would wrap the internal Spark hash function. That should guarantee that all records for the same bucket will land in one task. As long as the task writer uses the correct bucket expression (based on the table definition), we should be good, right?

You are right the partition/task ID during writes for a particular row may be different in V1 and V2 because of double hashing. But does it actually matter, though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply here. Yes, I think it should be OK as long as the task writer uses the bucket expression to decide which bucket the input record should go. There will be some extra work to achieve V1 and V2 compatibility but it's not that relevant to this PR now.

write: Write,
query: LogicalPlan,
funCatalogOpt: Option[FunctionCatalog]): LogicalPlan = write match {
case write: RequiresDistributionAndOrdering =>
val numPartitions = write.requiredNumPartitions()

val distribution = write.requiredDistribution match {
case d: OrderedDistribution => toCatalystOrdering(d.ordering(), query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the behavior before this PR? do we fail to translate v2 function or we fail at runtime complaining that some expression can't be evaluated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The former, the query is going to fail w/ org.apache.spark.sql.AnalysisException: ${expr} is not currently supported

case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query)).toSeq
case d: OrderedDistribution =>
toCatalystOrdering(d.ordering(), query, funCatalogOpt)
.map(e => resolveTransformExpression(e).asInstanceOf[SortOrder])
case d: ClusteredDistribution =>
d.clustering.map(e => toCatalyst(e, query, funCatalogOpt))
.map(e => resolveTransformExpression(e)).toSeq
case _: UnspecifiedDistribution => Seq.empty[Expression]
}

Expand All @@ -53,16 +64,33 @@ object DistributionAndOrderingUtils {
query
}

val ordering = toCatalystOrdering(write.requiredOrdering, query)
val ordering = toCatalystOrdering(write.requiredOrdering, query, funCatalogOpt)
val queryWithDistributionAndOrdering = if (ordering.nonEmpty) {
Sort(ordering, global = false, queryWithDistribution)
Sort(
ordering.map(e => resolveTransformExpression(e).asInstanceOf[SortOrder]),
global = false,
queryWithDistribution)
} else {
queryWithDistribution
}

queryWithDistributionAndOrdering

// Apply typeCoercionRules since the converted expression from TransformExpression
// implemented ImplicitCastInputTypes
typeCoercionRules.foldLeft(queryWithDistributionAndOrdering)((plan, rule) => rule(plan))
case _ =>
query
}

private def resolveTransformExpression(expr: Expression): Expression = expr.transform {
case TransformExpression(scalarFunc: ScalarFunction[_], arguments, Some(numBuckets)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't recall the details now. When do we need to translate v2 trasnform to TransformExpression?

Copy link
Member Author

@pan3793 pan3793 Jul 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransformExpression is introduced in SPARK-37377(#35657), to check input partition compatibility, it's constructed in V2ScanPartitioningAndOrdering

Copy link
Member Author

@pan3793 pan3793 Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-37377(#35657) changes the common code used by V2 reading & writing, converting the Transform to the TransformExpression.
If wanted, we can skip TransformExpression and resolve Transform to ApplyFunctionExpression / Invoke / StaticInvoke directly in V2 writing, but this may cause V2 reading & writing to share less code.

V2ExpressionUtils.resolveScalarFunction(scalarFunc, Seq(Literal(numBuckets)) ++ arguments)
case TransformExpression(scalarFunc: ScalarFunction[_], arguments, None) =>
V2ExpressionUtils.resolveScalarFunction(scalarFunc, arguments)
}

private def typeCoercionRules: List[Rule[LogicalPlan]] = if (conf.ansiEnabled) {
AnsiTypeCoercion.typeCoercionRules
} else {
TypeCoercion.typeCoercionRules
}
}
Expand Up @@ -20,7 +20,6 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.FunctionCatalog
import org.apache.spark.sql.connector.read.{SupportsReportOrdering, SupportsReportPartitioning}
import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, UnknownPartitioning}
import org.apache.spark.util.collection.Utils.sequenceToOption
Expand All @@ -41,14 +40,9 @@ object V2ScanPartitioningAndOrdering extends Rule[LogicalPlan] with SQLConfHelpe

private def partitioning(plan: LogicalPlan) = plan.transformDown {
case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, None, _) =>
val funCatalogOpt = relation.catalog.flatMap {
case c: FunctionCatalog => Some(c)
case _ => None
}

val catalystPartitioning = scan.outputPartitioning() match {
case kgp: KeyGroupedPartitioning => sequenceToOption(kgp.keys().map(
V2ExpressionUtils.toCatalystOpt(_, relation, funCatalogOpt)))
V2ExpressionUtils.toCatalystOpt(_, relation, relation.funCatalog)))
case _: UnknownPartitioning => None
case p => throw new IllegalArgumentException("Unsupported data source V2 partitioning " +
"type: " + p.getClass.getSimpleName)
Expand Down
Expand Up @@ -43,7 +43,7 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
case a @ AppendData(r: DataSourceV2Relation, query, options, _, None) =>
val writeBuilder = newWriteBuilder(r.table, options, query.schema)
val write = writeBuilder.build()
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query)
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, r.funCatalog)
a.copy(write = Some(write), query = newQuery)

case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, options, _, None) =>
Expand All @@ -67,7 +67,7 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
throw QueryExecutionErrors.overwriteTableByUnsupportedExpressionError(table)
}

val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query)
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, r.funCatalog)
o.copy(write = Some(write), query = newQuery)

case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, options, _, None) =>
Expand All @@ -79,7 +79,7 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
case _ =>
throw QueryExecutionErrors.dynamicPartitionOverwriteUnsupportedByTableError(table)
}
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query)
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, r.funCatalog)
o.copy(write = Some(write), query = newQuery)

case WriteToMicroBatchDataSource(
Expand All @@ -89,14 +89,15 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
val write = buildWriteForMicroBatch(table, writeBuilder, outputMode)
val microBatchWrite = new MicroBatchWrite(batchId, write.toStreaming)
val customMetrics = write.supportedCustomMetrics.toSeq
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query)
val funCatalogOpt = relation.flatMap(_.funCatalog)
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, funCatalogOpt)
WriteToDataSourceV2(relation, microBatchWrite, newQuery, customMetrics)

case rd @ ReplaceData(r: DataSourceV2Relation, _, query, _, None) =>
val rowSchema = StructType.fromAttributes(rd.dataInput)
val writeBuilder = newWriteBuilder(r.table, Map.empty, rowSchema)
val write = writeBuilder.build()
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query)
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, r.funCatalog)
// project away any metadata columns that could be used for distribution and ordering
rd.copy(write = Some(write), query = Project(rd.dataInput, newQuery))

Expand Down