Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrency Control] Support concurrent writes #72

Closed
liwensun opened this issue Jun 19, 2019 · 5 comments
Closed

[Concurrency Control] Support concurrent writes #72

liwensun opened this issue Jun 19, 2019 · 5 comments
Labels
enhancement New feature or request
Milestone

Comments

@liwensun
Copy link
Contributor

liwensun commented Jun 19, 2019

As of 0.2.0, Delta Lake supports concurrent appends, where an append must be only adding new data to the table without reading or modifying existing data in any way.

This issue is to track the support of more types of concurrent writes in Delta Lake, where a writer can also read and modify existing data, such as overwrite, delete or update.

Related issues:
#9: Allow concurrent writes to partitions that don't interact with each other
#23: Multi threading support on delta table

@liwensun liwensun added the enhancement New feature or request label Jun 19, 2019
@manuzhang
Copy link

@liwensun By concurrent append, do you mean the following is possible ?

object ConcurrencyTest {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.master("local").getOrCreate()

    for (_ <- 1 to 10) {
      val thread = new AppendThread(spark)
      thread.start()
    }
  }

  class AppendThread(spark: SparkSession) extends Thread {

    override def run(): Unit = {
      val data = spark.range(0, 5)
      data.write.format("delta").mode("append").save("/tmp/delta-test")
    }
  }

}

I've seen ProtocolChangeException when running the above codes

Exception in thread "Thread-23" org.apache.spark.sql.delta.ProtocolChangedException: The protocol version of the Delta table has been changed by a concurrent update. Please try the operation again.
Conflicting commit: {"version":0,"timestamp":1561364064793,"operation":"WRITE","operationParameters":{"mode":Append,"partitionBy":[]},"isBlindAppend":true}

Sorry for the noise if I'm mistaken about the concept here.

@liwensun
Copy link
Contributor Author

Hey @manuzhang, Thanks for trying out concurrent appends. I can understand that this could be a bit confusing here. You'll need to create the table first then do concurrent appends - otherwise each append operation will involve table creation operations, such as setting up the protocol version and table metadata, which we don't allow concurrently (hence the ProtocolChangeException).

@batesl87
Copy link

Hi @liwensun, is it enough to successfully append data into a folder once and then you are free to concurrently append or do you need to explicitly create a delta table?

The fact that you can create delta tables by writing into a mounted folder implies this would be fine but the databricks docs states that current appends only works within a workspace (https://docs.databricks.com/delta/delta-faq.html#what-are-the-limitations-of-multi-cluster-writes). Does this mean Delta is storing state outside the mounted storage? If so what commands initialise this shared state?

@liwensun
Copy link
Contributor Author

liwensun commented Aug 26, 2019

@batesl87 If you are using df.format("delta").mode("append").save(...), then you effectively create a delta table, and yes, you can do concurrent appends, but from a single cluster only if you are using Delta Lake OSS!

The doc you are looking is for managed delta lake offered by Databricks, not for OSS, and in that case, you still don't need to do anything extra as the "shared state" you talk about happens behind the scenes.

zsxwing pushed a commit to zsxwing/delta that referenced this issue Dec 10, 2019
…nflict detection in OptTxnImpl

This is a modified PR from the original PR delta-io#114 by @tomasbartalos (kudos, it was a very good PR!). This PR tracks transaction changes at a finer granularity (no new columns required in RemoveFile action) thus allowing more concurrent operations to succeed.

closes delta-io#228 and delta-io#72

This PR improves the conflict detection logic in OptTxn using the following strategy.
- OptTxn tracks two additional things
  - All the partitions read by the query using the OptTxn
  - All the files read by the query
- When committing a txn, it checks this txn's actions against the actions of concurrently committed txns using the following strategy:
  1. If any of the concurrently added files are in the partitions read by this txn, then fail because this txn should have read them.
      -It’s okay for files to have been removed from the partitions read by this txn as long as this txn never read those files. This is checked by the next rule.
  2. If any of the files read by this txn have already been removed by concurrent txns, then fail.
  3. If any of the files removed by this txn have already been removed by concurrent txns, then fail.
- In addition, I have made another change where setting `dataChange` to `false` in all the actions (enabled by delta-io#223) will ensure the txn will not conflict with any other concurrent txn based on predicates.

Tests written by @tomasbartalos in the original PR. Some tests were changed because some scenarios that were blocked in the original PR are now allowed, thanks to more granular and permissive conflict detection logic. Some test names tweaked to ensure clarity.

Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: Tomas Bartalos <tomas.bartalos@nike.sk>

Author: Tathagata Das <tathagata.das1565@gmail.com>

GitOrigin-RevId: f02a8f48838f86d256a86cd40241cdbfa74addb4
zsxwing pushed a commit to zsxwing/delta that referenced this issue Dec 10, 2019
…nflict detection in OptTxnImpl

This is a modified PR from the original PR delta-io#114 by @tomasbartalos (kudos, it was a very good PR!). This PR tracks transaction changes at a finer granularity (no new columns required in RemoveFile action) thus allowing more concurrent operations to succeed.

closes delta-io#228 and delta-io#72

This PR improves the conflict detection logic in OptTxn using the following strategy.
- OptTxn tracks two additional things
  - All the partitions read by the query using the OptTxn
  - All the files read by the query
- When committing a txn, it checks this txn's actions against the actions of concurrently committed txns using the following strategy:
  1. If any of the concurrently added files are in the partitions read by this txn, then fail because this txn should have read them.
      -It’s okay for files to have been removed from the partitions read by this txn as long as this txn never read those files. This is checked by the next rule.
  2. If any of the files read by this txn have already been removed by concurrent txns, then fail.
  3. If any of the files removed by this txn have already been removed by concurrent txns, then fail.
- In addition, I have made another change where setting `dataChange` to `false` in all the actions (enabled by delta-io#223) will ensure the txn will not conflict with any other concurrent txn based on predicates.

Tests written by @tomasbartalos in the original PR. Some tests were changed because some scenarios that were blocked in the original PR are now allowed, thanks to more granular and permissive conflict detection logic. Some test names tweaked to ensure clarity.

Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: Tomas Bartalos <tomas.bartalos@nike.sk>

GitOrigin-RevId: f02a8f48838f86d256a86cd40241cdbfa74addb4
zsxwing pushed a commit to zsxwing/delta that referenced this issue Dec 10, 2019
…nflict detection in OptTxnImpl

This is a modified PR from the original PR delta-io#114 by `tomasbartalos` (kudos, it was a very good PR!). This PR tracks transaction changes at a finer granularity (no new columns required in RemoveFile action) thus allowing more concurrent operations to succeed.

closes delta-io#228 and delta-io#72

This PR improves the conflict detection logic in OptTxn using the following strategy.
- OptTxn tracks two additional things
  - All the partitions read by the query using the OptTxn
  - All the files read by the query
- When committing a txn, it checks this txn's actions against the actions of concurrently committed txns using the following strategy:
  1. If any of the concurrently added files are in the partitions read by this txn, then fail because this txn should have read them.
      -It’s okay for files to have been removed from the partitions read by this txn as long as this txn never read those files. This is checked by the next rule.
  2. If any of the files read by this txn have already been removed by concurrent txns, then fail.
  3. If any of the files removed by this txn have already been removed by concurrent txns, then fail.
- In addition, I have made another change where setting `dataChange` to `false` in all the actions (enabled by delta-io#223) will ensure the txn will not conflict with any other concurrent txn based on predicates.

Tests written by `tomasbartalos` in the original PR. Some tests were changed because some scenarios that were blocked in the original PR are now allowed, thanks to more granular and permissive conflict detection logic. Some test names tweaked to ensure clarity.

Lead-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: Tomas Bartalos <tomas.bartalos@nike.sk>

GitOrigin-RevId: f02a8f48838f86d256a86cd40241cdbfa74addb4
zsxwing pushed a commit to zsxwing/delta that referenced this issue Dec 10, 2019
…nflict detection in OptTxnImpl

This is a modified PR from the original PR delta-io#114 by `tomasbartalos` (kudos, it was a very good PR!). This PR tracks transaction changes at a finer granularity (no new columns required in RemoveFile action) thus allowing more concurrent operations to succeed.

closes delta-io#228 and delta-io#72

This PR improves the conflict detection logic in OptTxn using the following strategy.
- OptTxn tracks two additional things
  - All the partitions read by the query using the OptTxn
  - All the files read by the query
- When committing a txn, it checks this txn's actions against the actions of concurrently committed txns using the following strategy:
  1. If any of the concurrently added files are in the partitions read by this txn, then fail because this txn should have read them.
      -It’s okay for files to have been removed from the partitions read by this txn as long as this txn never read those files. This is checked by the next rule.
  2. If any of the files read by this txn have already been removed by concurrent txns, then fail.
  3. If any of the files removed by this txn have already been removed by concurrent txns, then fail.
- In addition, I have made another change where setting `dataChange` to `false` in all the actions (enabled by delta-io#223) will ensure the txn will not conflict with any other concurrent txn based on predicates.

Tests written by `tomasbartalos` in the original PR. Some tests were changed because some scenarios that were blocked in the original PR are now allowed, thanks to more granular and permissive conflict detection logic. Some test names tweaked to ensure clarity.

GitOrigin-RevId: f02a8f48838f86d256a86cd40241cdbfa74addb4

Lead-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: Tomas Bartalos <tomas.bartalos@nike.sk>
@zsxwing
Copy link
Member

zsxwing commented Dec 10, 2019

Resolved by f328300

@zsxwing zsxwing closed this as completed Dec 10, 2019
@tdas tdas added this to the 0.5.0 milestone Dec 11, 2019
LantaoJin added a commit to LantaoJin/delta that referenced this issue Mar 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants