Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36571][SQL] Add an SQLOverwriteHadoopMapReduceCommitProtocol to support all SQL overwrite write data to staging dir #36056

Conversation

AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Apr 4, 2022

What changes were proposed in this pull request?

For the current data source insert SQL commit protocol, it has below problems:
case a: both job A and job B write data into partitioned table TBL with different statistic partition, it will have conflict since they use the same temp location ${table_location}/_temporary/0/....,
when job A has finished and then it will clean this temp location, then cause job B’s temp data is cleared. Then it will cause job B to fail to write data.
case b: for the current dynamic partition insert, if we kill a job writing data, will remain data under table location in the staging dir under table path.
case c: If we use a dynamic partition insert to insert a new table with a huge number of partitions, we need to move partition paths one by one, for this case, we can just rename stagingdir path to the table path to make it more quickly. But if we want to do this, we need to make staging dir can be customized and should not use the staging path under table location.

In this approach, we plan to add a new build in the SQL Commit protocol.
In this new SQL commit protocol ** SQLOverwriteHadoopMapReduceCommitProtocol** ,
We defined a new staging path that parallel to the target table path as
new Path(new Path(targetTablePath).getParent, s".${new Path(targetTablePath).getName}-spark-staging-" + jobId).
And we only change the behavior of overwrite mode. In overwrite mode:

for non-partition insert:

1. Spark won't delete the target table location before computing.
2. Before job commit, data have been computed and stored in staging dir
3. Before calling output committer's `commitJob`, we delete the target table location
4. After calling output committer's `commitJob`, we rename staging dir to target table location

for all static partition insert:

1.  Spark won't delete the matching partition before computing.
2. Before job commit, data have been computed and stored in staging dir
3. Before calling output committer's `commitJob`, we delete the matching partition
4. During the `commitJob` partition (with custom partition path) 's data have been written to the custom partition path
5. After calling the output committer's `commitJob`, if the custom partition path is not empty, result data have been written to target the custom partition path during commitJob. otherwise spark will rename staging dir to the target partition location

for dynamic partition insert: all same behaviors as SQLHadoopMapReduceCommitProtocol.

for dynamic partition overwrite in static mode

1.  Spark won't delete the matching partition before computing.
2. Before job commit, data have been computed and stored in staging dir
3. Before calling output committer's `commitJob`, we delete the matching partition
4. During the `commitJob` partition (with custom partition path) 's data have been written to the custom partition path
5. After calling output committer's `commitJob`, we rename staging dir to the target location for renamed normal partition path.

Why are the changes needed?

Provide new build-in sql commit protocol that can handle problems mentioned in pr desc.

Does this PR introduce any user-facing change?

User can set SQL commit protocol to org.apache.spark.sql.execution.datasources.SQLOverwriteHadoopMapReduceCommitProtocol to use a commit protocol with staging dir

How was this patch tested?

Added UT

…o support all SQL overwrite write data to staging dir
@AngersZhuuuu
Copy link
Contributor Author

ping @cloud-fan In order to realize the target that can support all overwrite, spark can't delete matching partitions before computing. But we support custom partition path and this part is handled in commit protocol, and spark must delete matching partitions before commit job. But

  1. In committer side, we don't know the information about jobs (such as if it's a overwriting? or is it's a non-partition table overwriting)
  2. Also in committer side, it don't know how to handle deleting matching partitions
  3. Custom partition path overwrite is handled in commit protocol's commitJob

So the best processing steps is:

  1. Not delete matching partitions
  2. Executing data and write temp path
  3. Delete matching partitions
  4. commitJob, in this step, custom partition path is written and commit data to staging dir
  5. rename staging dir to target location

@AngersZhuuuu AngersZhuuuu changed the title [WIP][SPARK-36571][SQL] Add an SQLOverwriteHadoopMapReduceCommitProtocol to support all SQL overwrite write data to staging dir [SPARK-36571][SQL] Add an SQLOverwriteHadoopMapReduceCommitProtocol to support all SQL overwrite write data to staging dir Apr 4, 2022
@AngersZhuuuu
Copy link
Contributor Author

Gentle ping @cloud-fan Could you take a look?

@dongjoon-hyun
Copy link
Member

Could you rebase this PR once more? I'll review this PR, @AngersZhuuuu .

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there is no concurrent query tests in this PR which is mentioned the main problem in the PR description? Did I understand correctly?

@AngersZhuuuu
Copy link
Contributor Author

Could you rebase this PR once more? I'll review this PR, @AngersZhuuuu .

Yea!

@AngersZhuuuu
Copy link
Contributor Author

It seems that there is no concurrent query tests in this PR which is mentioned the main problem in the PR description? Did I understand correctly?

It's hard to control two concurrent test commit files at the same time.

@AngersZhuuuu
Copy link
Contributor Author

Could you rebase this PR once more? I'll review this PR, @AngersZhuuuu .

Conflict resolved

.toDF("c1", "p1").repartition(1)
df.createOrReplaceTempView("temp")
sql("INSERT OVERWRITE TABLE t SELECT * FROM temp")
checkAnswer(sql("SELECT * FROM t"), df)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before rename to the target output the file is

/Users/yi.zhu/Documents/project/Angerszhuuuu/spark/sql/core/spark-warehouse/org.apache.spark.sql.execution.datasources.SQLOverwriteHadoopMapReduceCommitProtocolSuite/.t-spark-staging-2f9f8d83-118a-45f4-86d3-d3b3f68e03fa/
├── _SUCCESS
└── part-00000-2f9f8d83-118a-45f4-86d3-d3b3f68e03fa-c000.snappy.parquet

val df = Seq(1, 2, 3).toDF("c1")
df.createOrReplaceTempView("temp")
sql("INSERT OVERWRITE TABLE t PARTITION (p1 = 1, p2 = 1) SELECT * FROM temp")
checkAnswer(sql("SELECT c1 FROM t WHERE p1 = 1 AND p2 = 1"), df)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before rename to partition's target path

/Users/yi.zhu/Documents/project/Angerszhuuuu/spark/sql/core/spark-warehouse/org.apache.spark.sql.execution.datasources.SQLOverwriteHadoopMapReduceCommitProtocolSuite/.t-spark-staging-4800632c-fd44-4dd4-964c-be1aca8a939d/
├── _SUCCESS
└── p1=1
    └── p2=1
        ├── part-00000-4800632c-fd44-4dd4-964c-be1aca8a939d.c000.snappy.parquet
        └── part-00001-4800632c-fd44-4dd4-964c-be1aca8a939d.c000.snappy.parquet

2 directories, 3 files

df.createOrReplaceTempView("temp")
sql("INSERT OVERWRITE TABLE t SELECT * FROM temp")
checkAnswer(sql("SELECT * FROM t"), df)
checkAnswer(sql("SELECT c1 FROM t WHERE p1 = 1 AND p2 = 1"), Row(1) :: Nil)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before move to target path, the file in staging dir

/Users/yi.zhu/Documents/project/Angerszhuuuu/spark/sql/core/spark-warehouse/org.apache.spark.sql.execution.datasources.SQLOverwriteHadoopMapReduceCommitProtocolSuite/.t-spark-staging-669ea1c6-5eaa-4ae7-a670-0261cbde3318/
├── _SUCCESS
├── p1=1
│   └── p2=1
│       └── part-00000-669ea1c6-5eaa-4ae7-a670-0261cbde3318.c000.snappy.parquet
├── p1=2
│   └── p2=2
│       └── part-00001-669ea1c6-5eaa-4ae7-a670-0261cbde3318.c000.snappy.parquet
└── p1=3
    └── p2=3
        └── part-00001-669ea1c6-5eaa-4ae7-a670-0261cbde3318.c000.snappy.parquet

@dongjoon-hyun
Copy link
Member

Thank you, @AngersZhuuuu .

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @wangyum and @c21 . WDYT about this PR?

@github-actions
Copy link

github-actions bot commented Dec 4, 2022

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Dec 4, 2022
@github-actions github-actions bot closed this Dec 5, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants