Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrency Control] Allow compaction against blind-appendings when check conflicts #228

Closed
xianyinxin opened this issue Oct 29, 2019 · 12 comments
Milestone

Comments

@xianyinxin
Copy link

@xianyinxin xianyinxin commented Oct 29, 2019

There're several optimizations on the concurrent operations on delta log, like concurrent blind appends. However, it would be useful if we allow no-data-change ops, one typical scenario of which is doing compaction, against blind-appendings (like streaming writing).

This can be seen as a subtask of #72, and may need the finish of #146.

@xianyinxin

This comment has been minimized.

Copy link
Author

@xianyinxin xianyinxin commented Oct 29, 2019

@xianyinxin xianyinxin changed the title [Concurrency Control] Allow no-data-change operation (like compaction) against blind-append operations. [Concurrency Control] Allow no-data-change operation (like compaction) against blind-appendings. Oct 29, 2019
@xianyinxin

This comment has been minimized.

Copy link
Author

@xianyinxin xianyinxin commented Nov 1, 2019

cc @liwensun @tdas @marmbrus , could you give some suggestion like if this optimization is feasible? If yes, I'd like to propose a pull request.

@marmbrus

This comment has been minimized.

Copy link
Contributor

@marmbrus marmbrus commented Nov 1, 2019

I'd need to see more design to understand if this is something feasible to implement for Delta. Specifically, rather than understand what operations you want to perform at the same time, I'd like to understand the guarantees that this isolation mode provides.

@xianyinxin

This comment has been minimized.

Copy link
Author

@xianyinxin xianyinxin commented Nov 4, 2019

Thanks for your reply @marmbrus . When I performed compaction while another thread was streaming data into delta, I found sometimes the compaction would fail because of commits conflicts. Since the streaming work was just blindly append small files which has nothing to do with the history data, the compaction could not have to fail because it didn't change the data. In this case, we can allow the compaction work to retry the commit, just like what the 'concurrent appending' does.

Maybe I'd better limit the "no-data-change operations" in title to "compaction".

@xianyinxin xianyinxin changed the title [Concurrency Control] Allow no-data-change operation (like compaction) against blind-appendings. [Concurrency Control] Allow compaction against blind-appendings when check conflicts Nov 4, 2019
@rahulsmahadev

This comment has been minimized.

Copy link
Collaborator

@rahulsmahadev rahulsmahadev commented Nov 4, 2019

@xianyinxin

This comment has been minimized.

Copy link
Author

@xianyinxin xianyinxin commented Nov 4, 2019

Thanks @rahulsmahadev , 009c949 is helpful but is not enough. There's a check in OptimisticTransactionImpl#checkAndRetry:

      // Fail if the data is different than what the txn read.
      if (dependsOnFiles && fileActions.nonEmpty) {
        throw new ConcurrentWriteException(commitInfo)
      }

and I'm afraid it would fail here, because the compaction is "dependsOnFiles" and the winningCommit.fileActions is nonEmpty.

I'll do a test and post the result later.

@cfmcgrady

This comment has been minimized.

Copy link

@cfmcgrady cfmcgrady commented Nov 12, 2019

I also had the same problem.
consider the following case:
application1. we start a low duration structured streaming application, append the data into delta table, partitioned by date.
application2. we do optimizer(e.g. compaction) for the delta table per day.

notice that the optimizer operation always took a long time to complete, maybe some hours. so when we do the compaction operation commit, the concurrency check will fail because of compaction depends On Files and the fileActions.nonEmpty is true, as @xianyinxin mentioned above.

@marmbrus

This comment has been minimized.

Copy link
Contributor

@marmbrus marmbrus commented Nov 12, 2019

@xianyinxin I don't think you can just get rid of that check though. That would make the results of concurrent merges incorrect. Consider the case where two concurrent merges attempt to modify the same key. They both read the table, and it is not valid for them to proceed if the the data they read has been changed.

I'd be happy to relax things here, but we need a design / proof that the changes are not breaking ACID / serializability.

@xianyinxin

This comment has been minimized.

Copy link
Author

@xianyinxin xianyinxin commented Nov 13, 2019

Thanks @marmbrus for your suggestion.

I don't think you can just get rid of that check though. That would make the results of concurrent merges incorrect. Consider the case where two concurrent merges attempt to modify the same key. They both read the table, and it is not valid for them to proceed if the the data they read has been changed.

I'm not going to get rid of that check, but just relax it in certain cases.

I'd be happy to relax things here, but we need a design / proof that the changes are not breaking ACID / serializability.

My thoughts is, suppose there're an streaming appending work, generate files and files sequencely,

A -> B -> C -> D ->E

At point E, we start an compaction job. The expected result of the job would be

[A -> B -> C -> D ->E] -> F

where F stands for all of the valid data from A to E. However, as @cfmcgrady pointed out, the compaction job may take long time. At the same time, the streaming appending job is still running, and when the compaction job commits, it might hit a conflict

Streaming: A -> B -> C -> D ->E -> F -> G -> H
Compaction: [A -> B -> C -> D ->E]-> F'

But F, G, H are just blind appends, it has nothing to do with A,B,C,D,E, so it would be better if the final result is

F' -> F -> G -> H

where F' stands for [A -> B -> C -> D ->E]. Of course we cannot use F', we need to retry the commit until it success,

Streaming: A -> B -> C -> D ->E -> F -> G -> H
Compaction: [A -> B -> C -> D ->E]-> try F -> try G -> try H -> I

where I stands for [A -> B -> C -> D ->E].

I'm not sure if I miss something, but an intuitive feeling is that it should work.

@cfmcgrady

This comment has been minimized.

Copy link

@cfmcgrady cfmcgrady commented Nov 19, 2019

Hi, @marmbrus
Here is a full example for replay:

  test("just for replay") {
    withTempDir { tempDir =>
      val log = DeltaLog(spark, new Path(tempDir.getCanonicalPath))
      // Initialize the log. Truncate() is just a no-op placeholder.
      log.startTransaction().commit(Nil, Truncate())

      // mock a streaming application, append the new data to the table
      appendNewData(log, "a")
      appendNewData(log, "b")
      appendNewData(log, "c")
      appendNewData(log, "d")
      appendNewData(log, "e")

      // we start a new application for optimizer the table and doing compaction.
      val compactionTxn = log.startTransaction()

      // notice, the compaction usually takes a long time.
      // and the streaming application append new data to the table.
      appendNewData(log, "f")
      appendNewData(log, "g")
      appendNewData(log, "h")

      // finished compaction, and start tries to commit.
      val compactionFile = AddFile("f'", Map.empty, 1, 1, dataChange = false)
      val fileToRemove = compactionTxn.filterFiles().map(_.remove).map(_.copy(dataChange = false))
      
      compactionTxn.commit(fileToRemove ++ Seq(compactionFile), Truncate())
    }
  }

  def appendNewData(deltaLog: DeltaLog, fileToAdd: String): Unit = {
    val addFile = AddFile(fileToAdd, Map.empty, 1, 1, dataChange = true)
    deltaLog.startTransaction().commit(addFile :: Nil, Truncate())
  }

Unfortunately, the compaction commit will go failed and throw a ConcurrentWriteException. because of the concurrency check mechanism, as @xianyinxiang mentioned above..

@cfmcgrady

This comment has been minimized.

Copy link

@cfmcgrady cfmcgrady commented Nov 19, 2019

I have a suggestion, we should update the concurrency check condition. In the data rearrange mode, we should check the RemoveFile has a conflict with winning commit's RemoveFile or not.

// Pseudo code

      if (isRearrangeOperation) {
        if (winningCommit.removeFiles.contains(removeFiles)) {
          throw new ConcurrentWriteException(commitInfo)
        }
      } else {
        // Fail if the data is different than what the txn read.
        if (dependsOnFiles && fileActions.nonEmpty) {
          throw new ConcurrentWriteException(commitInfo)
        }
      }

but there is a risk in update case if we set dataChanage = false.

@marmbrus

This comment has been minimized.

Copy link
Contributor

@marmbrus marmbrus commented Nov 19, 2019

I understand the case that does not work, but what I do not understand is the formal properties of the isolation level that you are proposing we implement instead.

Right now it is very simple. The transaction log is strictly-serializable (i.e. the result is the same as though you ran each transaction one at a time, even though they might have actually run concurrently).

zsxwing added a commit to zsxwing/delta that referenced this issue Dec 10, 2019
…nflict detection in OptTxnImpl

This is a modified PR from the original PR delta-io#114 by @tomasbartalos (kudos, it was a very good PR!). This PR tracks transaction changes at a finer granularity (no new columns required in RemoveFile action) thus allowing more concurrent operations to succeed.

closes delta-io#228 and delta-io#72

This PR improves the conflict detection logic in OptTxn using the following strategy.
- OptTxn tracks two additional things
  - All the partitions read by the query using the OptTxn
  - All the files read by the query
- When committing a txn, it checks this txn's actions against the actions of concurrently committed txns using the following strategy:
  1. If any of the concurrently added files are in the partitions read by this txn, then fail because this txn should have read them.
      -It’s okay for files to have been removed from the partitions read by this txn as long as this txn never read those files. This is checked by the next rule.
  2. If any of the files read by this txn have already been removed by concurrent txns, then fail.
  3. If any of the files removed by this txn have already been removed by concurrent txns, then fail.
- In addition, I have made another change where setting `dataChange` to `false` in all the actions (enabled by delta-io#223) will ensure the txn will not conflict with any other concurrent txn based on predicates.

Tests written by @tomasbartalos in the original PR. Some tests were changed because some scenarios that were blocked in the original PR are now allowed, thanks to more granular and permissive conflict detection logic. Some test names tweaked to ensure clarity.

Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: Tomas Bartalos <tomas.bartalos@nike.sk>

Author: Tathagata Das <tathagata.das1565@gmail.com>

GitOrigin-RevId: f02a8f48838f86d256a86cd40241cdbfa74addb4
zsxwing added a commit to zsxwing/delta that referenced this issue Dec 10, 2019
…nflict detection in OptTxnImpl

This is a modified PR from the original PR delta-io#114 by @tomasbartalos (kudos, it was a very good PR!). This PR tracks transaction changes at a finer granularity (no new columns required in RemoveFile action) thus allowing more concurrent operations to succeed.

closes delta-io#228 and delta-io#72

This PR improves the conflict detection logic in OptTxn using the following strategy.
- OptTxn tracks two additional things
  - All the partitions read by the query using the OptTxn
  - All the files read by the query
- When committing a txn, it checks this txn's actions against the actions of concurrently committed txns using the following strategy:
  1. If any of the concurrently added files are in the partitions read by this txn, then fail because this txn should have read them.
      -It’s okay for files to have been removed from the partitions read by this txn as long as this txn never read those files. This is checked by the next rule.
  2. If any of the files read by this txn have already been removed by concurrent txns, then fail.
  3. If any of the files removed by this txn have already been removed by concurrent txns, then fail.
- In addition, I have made another change where setting `dataChange` to `false` in all the actions (enabled by delta-io#223) will ensure the txn will not conflict with any other concurrent txn based on predicates.

Tests written by @tomasbartalos in the original PR. Some tests were changed because some scenarios that were blocked in the original PR are now allowed, thanks to more granular and permissive conflict detection logic. Some test names tweaked to ensure clarity.

Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: Tomas Bartalos <tomas.bartalos@nike.sk>

GitOrigin-RevId: f02a8f48838f86d256a86cd40241cdbfa74addb4
zsxwing added a commit to zsxwing/delta that referenced this issue Dec 10, 2019
…nflict detection in OptTxnImpl

This is a modified PR from the original PR delta-io#114 by `tomasbartalos` (kudos, it was a very good PR!). This PR tracks transaction changes at a finer granularity (no new columns required in RemoveFile action) thus allowing more concurrent operations to succeed.

closes delta-io#228 and delta-io#72

This PR improves the conflict detection logic in OptTxn using the following strategy.
- OptTxn tracks two additional things
  - All the partitions read by the query using the OptTxn
  - All the files read by the query
- When committing a txn, it checks this txn's actions against the actions of concurrently committed txns using the following strategy:
  1. If any of the concurrently added files are in the partitions read by this txn, then fail because this txn should have read them.
      -It’s okay for files to have been removed from the partitions read by this txn as long as this txn never read those files. This is checked by the next rule.
  2. If any of the files read by this txn have already been removed by concurrent txns, then fail.
  3. If any of the files removed by this txn have already been removed by concurrent txns, then fail.
- In addition, I have made another change where setting `dataChange` to `false` in all the actions (enabled by delta-io#223) will ensure the txn will not conflict with any other concurrent txn based on predicates.

Tests written by `tomasbartalos` in the original PR. Some tests were changed because some scenarios that were blocked in the original PR are now allowed, thanks to more granular and permissive conflict detection logic. Some test names tweaked to ensure clarity.

Lead-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: Tomas Bartalos <tomas.bartalos@nike.sk>

GitOrigin-RevId: f02a8f48838f86d256a86cd40241cdbfa74addb4
@zsxwing zsxwing closed this in f328300 Dec 10, 2019
@tdas tdas added this to the 0.5.0 milestone Dec 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.