Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33808][SQL] DataSource V2: Build logical writes in the optimizer #30806

Closed
wants to merge 2 commits into from

Conversation

aokolnychyi
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds logic to build logical writes introduced in SPARK-33779.

Note: This PR contains a subset of changes discussed in PR #29066.

Why are the changes needed?

These changes are the next step as discussed in the design doc for SPARK-23889.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Dec 16, 2020

I added @rdblue as a co-author of the change as he addressed the initial comments to my PR and I included his changes.

@@ -94,7 +96,8 @@ case class OverwriteByExpression(
deleteExpr: Expression,
query: LogicalPlan,
writeOptions: Map[String, String],
isByName: Boolean) extends V2WriteCommand {
isByName: Boolean,
write: Option[Write] = None) extends V2WriteCommand {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this optional allows us to reuse the same plan before we construct a write and after. Having None here means the logical write hasn't been constructed yet. This allows us to have idempotent rules in the optimizer.

AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r)) :: Nil
AppendDataExecV1(
v1, writeOptions.asOptions, query,
refreshCache(r), write.map(_.asInstanceOf[V1Write])) :: Nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is one open point we need to discuss: do we want to always apply the new logic or should we expose a feature flag and construct logical writes only if the flag is enabled? I'd vote for always constructing writes using the new logic as it feels quite same and it does not have the burden of maintaining one more config. In addition, this will allow us to simply this PR a bit and get rid of optional writes in exec nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be curious about what everybody thinks here. I will be okay either way.

@aokolnychyi
Copy link
Contributor Author

cc @dongjoon-hyun @dbtsai @sunchao @rdblue @viirya @cloud-fan @HyukjinKwon @HeartSaVioR @holdenk


override protected def run(): Seq[InternalRow] = {
writeWithV1(newWriteBuilder().buildForV1Write(), refreshCache = refreshCache)
override protected def buildAndRun(): Seq[InternalRow] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of all buildAndRun methods if we are OK to apply the new logic all the time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm interested to hear what @dongjoon-hyun thinks about this.

I think we should have a different physical node for each write so that the explain plan shows what is happening. Otherwise, the approach to support building the batch write here or building it in the optimizer was mainly to be able to turn this on and off in our environment. I doubt that is needed in other situations.

I think I would be for removing all of the buildAndRun methods and always building the write in the optimizer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it cause many code change on top of this? If it is not intrusive, it sounds reasonable in that context. I'd give +1 for the direction, @rdblue .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting rid of buildAndRun would also ensure we don't have to maintain the same logic in two places.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also +1 on getting rid of buildAndRun.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like an intermediate consensus, I'll update the PR but we can revisit this once we have more input from others.

@@ -89,6 +90,21 @@ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {

def table: SupportsWrite
def writeOptions: CaseInsensitiveStringMap
def refreshCache: () => Unit
def write: Option[V1Write] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: Option[V1Write] can become just V1Write if we are OK to apply the new logic all the time.

val session = sqlContext.sparkSession
// The `plan` is already optimized, we should not analyze and optimize it again.
relation.insert(AlreadyOptimized.dataFrame(session, plan), overwrite = false)
refreshCache()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refresh moved to run.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@SparkQA
Copy link

SparkQA commented Dec 16, 2020

Test build #132896 has finished for PR 30806 at commit ce334cc.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait V2ExistingTableWriteExec extends V2TableWriteExec

@aokolnychyi
Copy link
Contributor Author

Hm, the failure is a bit weird and does not seem related.

Setting status of ce334cc48c8ea615a2465a9111fd0d9f6f606eb2 to FAILURE with url https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/132896/ and message: 'Build finished. '
FileNotFoundException means that the credentials Jenkins is using is probably wrong. Or the user account does not have write access to the repo.
org.kohsuke.github.GHFileNotFoundException: https://api.github.com/repos/apache/spark/statuses/ce334cc48c8ea615a2465a9111fd0d9f6f606eb2 {"message":"Not Found","documentation_url":"https://docs.github.com/rest/reference/repos#create-a-commit-status"}

@@ -39,6 +39,9 @@ class SparkOptimizer(
// TODO: move SchemaPruning into catalyst
SchemaPruning :: V2ScanRelationPushDown :: PruneFileSourcePartitions :: Nil

override def dataSourceRewriteRules: Seq[Rule[LogicalPlan]] =
V2Writes :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably put this in the early pushdown batch, even though the name doesn't match. The rewrite batch needs to run before this so that writes created by it run V2Writes afterward. That's the same reason why early pushdown runs after plan rewrites.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it makes sense after thinking more about it.

@SparkQA
Copy link

SparkQA commented Dec 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37498/

@github-actions github-actions bot added the SQL label Dec 16, 2020
@SparkQA
Copy link

SparkQA commented Dec 16, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37498/

@SparkQA
Copy link

SparkQA commented Dec 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37502/

@SparkQA
Copy link

SparkQA commented Dec 16, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37502/

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Test build #132900 has finished for PR 30806 at commit db6abdb.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@jzhuge jzhuge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Minor suggestions and nits.

@@ -188,15 +189,20 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
orCreate = orCreate) :: Nil
}

case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
case AppendData(r: DataSourceV2Relation, query, writeOptions, _, write) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is write guaranteed not be None?

How about rewriting this case as follows?

    case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), query, writeOptions,
        _, Some(v1Write: V1Write)) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
      AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r), v1Write) :: Nil

    case AppendData(r @ DataSourceV2Relation(v2: SupportsWrite, _, _, _, _),
        query, writeOptions, _, Some(write)) =>
      AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r), write) :: Nil

It is not exactly the same as the existing code. Some unmatched cases (not sure how many or if any) will fall through. Exception will be thrown later, instead of right here upon instance cast or Option.get.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this idea. It's guaranteed that the write will be Some not None at the planner, so matching Some(write) is better.

It's possible that the implementation declares V1_BATCH_WRITE but doesn't return V1Write. We should give clear error message if it happens:

case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), query, writeOptions,
        _, Some(write)) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
  if (!write.isInstanceOf[V1Write])  throw ...
  ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to add more meaningful exception here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this place. Could you take a look, @jzhuge and @cloud-fan?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks for taking care of Overwrite* cases as well.

AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r)) :: Nil
AppendDataExecV1(
v1, writeOptions.asOptions, query,
refreshCache(r), write.get.asInstanceOf[V1Write]) :: Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible to avoid instance cast? See my suggestion above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil
AppendDataExec(
v2, writeOptions.asOptions, planLater(query),
refreshCache(r), write.get) :: Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible to avoid Option.get? See my suggestion above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of it.

protected def writeWithV1(
relation: InsertableRelation,
refreshCache: () => Unit = () => ()): Seq[InternalRow] = {
protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely simplified

case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write())
case v2 => writeWithV2(v2.buildForBatch())
val write = writeBuilder.build()
val writtenRows = write match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: merge line 451-454 into:

          val writtenRows = table.newWriteBuilder(info).build() match {

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly about this place and can update it. However, I do prefer to split different logical parts into different variables. Here, I've separated building a logical write from actually writing the records. Let me know what are your thoughts, @jzhuge.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with me

@@ -33,6 +33,11 @@
*/
@Unstable
public interface V1WriteBuilder extends WriteBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unstable API, can we just remove it and only use V1Write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should simplify the V1 fallback API. I'll update it, @cloud-fan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of V1WriteBuilder and tried to update docs too. Let me know if I missed places, @cloud-fan.

}

case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the filter conversion as it is done earlier now.


case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, options, _, None) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(deleteExpr).flatMap { pred =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the old logic but I am not sure whether we should also normalize filters. Thoughts, @cloud-fan @rdblue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should, to follow DS v1 and file source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created SPARK-33868 as a follow-up item. I will keep the old behavior in this PR.

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Test build #132965 has finished for PR 30806 at commit e6335c0.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37568/

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37568/

@aokolnychyi
Copy link
Contributor Author

Retest this, please.

@SparkQA
Copy link

SparkQA commented Dec 21, 2020

Test build #133150 has finished for PR 30806 at commit e6335c0.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 21, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37750/

@SparkQA
Copy link

SparkQA commented Dec 21, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37750/

Lead-authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Co-authored-by: Ryan Blue <blue@apache.org>
@aokolnychyi
Copy link
Contributor Author

I think certain checks are expected to fail:

[error] spark-sql: Failed binary compatibility check against org.apache.spark:spark-sql_2.12:3.0.0! Found 1 potential problems (filtered 914)
[error]  * interface org.apache.spark.sql.connector.write.V1WriteBuilder does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.write.V1WriteBuilder")

Per discussion here.

@SparkQA
Copy link

SparkQA commented Dec 21, 2020

Test build #133160 has finished for PR 30806 at commit 84241e0.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 21, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37759/

case v2Write =>
throw new AnalysisException(
s"Table ${v1.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " +
s"${v2Write.getClass} is not an instance of ${classOf[V1Write]}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

classOf[V1Write].getName

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -65,7 +66,8 @@ case class AppendData(
table: NamedRelation,
query: LogicalPlan,
writeOptions: Map[String, String],
isByName: Boolean) extends V2WriteCommand {
isByName: Boolean,
write: Option[Write] = None) extends V2WriteCommand {
override def withNewQuery(newQuery: LogicalPlan): AppendData = copy(query = newQuery)
override def withNewTable(newTable: NamedRelation): AppendData = copy(table = newTable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add override lazy val resolved = ... && write.isDefined in V2WriteCommand? It's safer to make sure that the analyzer creates the Write object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good idea but we actually construct the Write object in the optimizer after the operator optimization is done to ensure we operate on optimal expressions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see, let's leave it then.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cloud-fan
Copy link
Contributor

We need to update project/MimaExcludes.scala to fix the mima errors.

@SparkQA
Copy link

SparkQA commented Dec 21, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37759/

@@ -132,7 +135,8 @@ case class OverwritePartitionsDynamic(
table: NamedRelation,
query: LogicalPlan,
writeOptions: Map[String, String],
isByName: Boolean) extends V2WriteCommand {
isByName: Boolean,
write: Option[Write] = None) extends V2WriteCommand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR. I'm thinking that if we should have an optional Scan object in DataSourceV2Relation, instead of having a new logical plan DataSourceV2ScanRelation. It's simpler and consistent with the write logical plans. cc @rdblue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question: DataSourceV2Relation is also used inside write nodes like AppendData. If we add an optional scan, will that mean we will leak a read-specific concept into write plans?

cc @rdblue @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For AppendData, we intentionally do not treat the table as a child, which means the pushdown rule won't apply for it and the Scan object will always be None in AppendData.

@github-actions github-actions bot added the BUILD label Dec 21, 2020
@SparkQA
Copy link

SparkQA commented Dec 22, 2020

Test build #133168 has finished for PR 30806 at commit 882e321.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Dec 22, 2020

Retest this please.

@SparkQA
Copy link

SparkQA commented Dec 22, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37784/

@SparkQA
Copy link

SparkQA commented Dec 22, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37784/

@AmplabJenkins
Copy link

Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/37784/

@SparkQA
Copy link

SparkQA commented Dec 22, 2020

Test build #133186 has finished for PR 30806 at commit 882e321.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/133186/

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 2562183 Dec 22, 2020
@aokolnychyi
Copy link
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants