Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow concurrent writes to partitions that don't interact with each other #9

Closed
calvinlfer opened this issue Apr 25, 2019 · 16 comments
Closed
Labels
Milestone

Comments

@calvinlfer
Copy link

@calvinlfer calvinlfer commented Apr 25, 2019

I have a use case where I would like to update multiple partitions of data at the same time but the partitions are totally separate and do not interact with each other.

For example, I would like to run these queries concurrently (which would be very useful in the case of backfills):

spark.read.parquet("s3://jim/dataset/v1/valid-records")
  .filter(col("event_date") === lit("2019-01-01"))
  .write
  .partitionBy("event_date")
  .format("delta")
  .mode(SaveMode.Overwrite)
  .option("replaceWhere", "event_date == '2019-01-01'")
  .save("s3://calvin/delta-experiments")
spark.read.parquet("s3://jim/dataset/v1/valid-records")
  .filter(col("event_date") === lit("2019-01-02"))
  .write
  .partitionBy("event_date")
  .format("delta")
  .mode(SaveMode.Overwrite)
  .option("replaceWhere", "event_date == '2019-01-02'")
  .save("s3://calvin/delta-experiments")

So the data above being written as delta belongs to two separate partitions which do not interact with each other. According to the Delta documentation and what I experience is a com.databricks.sql.transaction.tahoe.ProtocolChangedException: The protocol version of the Delta table has been changed by a concurrent update. Please try the operation again.

Would you support this use-case where you can update partitions concurrently that do not interact with each other?

Parquet seems to allow this just fine (without any corruption if you turn on dynamic partitioning with spark.sql.sources.partitionOverwriteMode). This is a very valid use case if you adhere to Maxime Beauchemin's technique of immutable table partitions.

@wernerdaehn

This comment has been minimized.

Copy link

@wernerdaehn wernerdaehn commented Apr 25, 2019

I would assume this limitation is because Delta supports ACID within a table. If you have two sessions writing into different partitions, this would need a different transaction handling compared to the situation where a single writer writes into all partitions.
Might be harder to implement than it looks at first sight.

Having said that, I would love to have such option as well. There will be situations where you need mass data loads and would be okay with a relaxed transaction guarantee. And there will be situations where transaction guarantees are more important than mass data performance.

my2cents

@hackmad

This comment has been minimized.

Copy link

@hackmad hackmad commented Apr 25, 2019

I can appreciate the challenges in designing something like this. However, it basically makes it so that existing processes that can use Parquet format to simultaneously load partitions cannot be converted over to using Delta. It essentially serializes all stages that could be run in parallel. It might be worth having an option to load all data and then update partition information in the metastore. Similar to how you would have to do in Athena. If a new pipeline is created, we would have to workaround this by first loading the secondary parition level data into their own S3 locations without partitioning and then later organize them. This would still have the additional overhead of addtional storage (which could be mitigated with retention policies in S3) but more importantly more than doubling the compute cost to process the data a second time.

@tdas

This comment has been minimized.

Copy link
Contributor

@tdas tdas commented Apr 25, 2019

First of all, thank you very much for trying out Delta Lake! The current version (0.1.0) has a very restrictive conflict detection check to be absolutely safe. In future releases, we will slowly relax the conflict criteria to allow more concurrency while ensuring the ACID guarantees. Hopefully, we will be able to make such workloads easier.

@tdas tdas added the enhancement label Apr 25, 2019
@ligao101

This comment has been minimized.

Copy link

@ligao101 ligao101 commented Apr 25, 2019

hello I am looking into how delta would work on s3 based data lake. what are the current limitations of running the current delta lake on s3? This issue appears to be one of the potential limitations we could see. Thanks!

@tdas

This comment has been minimized.

Copy link
Contributor

@tdas tdas commented Apr 25, 2019

Delta Lake currently only works with HDFS with full guarantees because HDFS provides the necessary file system operation guarantees that give Delta Lake its consistency guarantee. S3 file system does not provide those guarantees yet, primarily because S3 does not provide list-after-write guarantees.
Details on the required guarantees - https://github.com/delta-io/delta#transaction-protocol

PS: This is a completely different issue that the original issue in this thread. Please make a different issue for this.

@liwensun

This comment has been minimized.

Copy link
Collaborator

@liwensun liwensun commented Jun 19, 2019

Thanks for sharing your use case and the great discussions. I have created a concurrency support tracking issue which references to this issue so people can see the use cases and discussions here.

@liwensun liwensun closed this Jun 19, 2019
@liwensun liwensun reopened this Jun 19, 2019
@koertkuipers

This comment has been minimized.

Copy link

@koertkuipers koertkuipers commented Aug 28, 2019

@calvinlfer i agree that concurrent writes to totally separate partitions would be great.

however i was surprised to hear you say parquet supports this just fine. we have run into issues with this using dynamic partitionOverwriteMode and partitionBy, because both writers try to create temporary files in baseDir/_temporary, leading to weird errors when one finishes and deletes _temporary while the other job is still running. just FYI.

@koertkuipers

This comment has been minimized.

Copy link

@koertkuipers koertkuipers commented Aug 29, 2019

i am not sure it is straightforward to safely allow concurrent writes that replace partitions. optimistic transaction seems to know what files were added or deleted, but thats not the same as knowing what the intent was of the transaction.

for example a transaction might have had the intent to replace everything where say a=1, so replaceWhere("a=1"), and let say there was nothing to delete and it only wrote out only to a=1/b=1/part.snappy
if another transaction ran at same and also had a replaceWhere("a=1") and also deleted nothing but created a file a=1/b=2/part.snappy, then by just looking at the file actions they do not seem to be in conflict, but they are.

@koertkuipers

This comment has been minimized.

Copy link

@koertkuipers koertkuipers commented Aug 29, 2019

note that for the example of dynamic partition overwrite (which is not in delta but we added on our own branch) it is easy to reason about, because the files deleted are always in exact same partitions as where files are added, so you only need to check for conflicts with respect to added files (e.g. verify the transactions did not write to exact same partitions).

@calvinlfer

This comment has been minimized.

Copy link
Author

@calvinlfer calvinlfer commented Aug 29, 2019

Hey @koertkuipers what version and flavor of Spark are you using? @hackmad and myself have seen this work at scale on the Databricks platform with Spark 2.4.1

@koertkuipers

This comment has been minimized.

Copy link

@koertkuipers koertkuipers commented Aug 29, 2019

@calvinlfer we use spark 2.4.1 on hadoop 2.7
however i am a little uncertain if that's the version we observed the issue with or if it was an earlier version and we have avoided the situation ever since. i remember the errors being hdfs lease exceptions because one job would delete the _temporary directory while the other was still using it.

@koertkuipers

This comment has been minimized.

Copy link

@koertkuipers koertkuipers commented Aug 29, 2019

@calvinlfer maybe things changed for the better. i now see when i run two jobs writing to different partitions using partitionBy and dynamic partitionOverwriteMode:

drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:17 out/_temporary/0

so it seems each job has its own .spark-staging directory, and _temporary isnt really used? not sure...

@calvinlfer

This comment has been minimized.

Copy link
Author

@calvinlfer calvinlfer commented Aug 30, 2019

Sorry I should have mentioned this more explicitly earlier but we used S3 instead of HDFS so I believe the underlying implementation is quite different and allows for concurrent writes to non conflicting partitions

@hospadar

This comment has been minimized.

Copy link

@hospadar hospadar commented Sep 6, 2019

Wanted to add to this - this would be a blocker for us as well to switch from parquet to delta.

Right now we store our underlying data in s3 as parquet and do management of partitions fairly manually to keep tables in a happy state. We always write out new (or replacement) partitions to a new folder then just swap the location of the partition in the metastore to make it look like an atomic update to anyone querying the data downstream (also allows us to theoretically roll back an update, although that's impractical for us and requires log spelunking to find the old paths).

We often do big backfill/reprocessing jobs where we process tons of dates in parallel to keep the cluster over-committed. If we could only write one partition at once our throughput would slow down quite a bit on jobs like this.

I'd love to switch to delta, it would make it MUCH easier to revert data to earlier states (and a variety of other things would become more convenient for us), but this issue is probably a blocker.

Our logic goes something like:

///// First thread is doing something like:
String path = "s3://warehouse/" + UUID.randomUUID().toString()
dataframe.write.parquet(path)
spark.sql("ALTER TABLE target PARTITION (dt='2019-01-01') SET LOCATION '" + path + "'")

/////Another thread doing the same thing, but for a different date
String path2 = "s3://warehouse/" + UUID.randomUUID().toString()
dataframe.write.parquet(path2)
//register second datframe to a different partition
spark.sql("ALTER TABLE target PARTITION (dt='2019-01-02') SET LOCATION '" + path2 + "'")
@koertkuipers

This comment has been minimized.

Copy link

@koertkuipers koertkuipers commented Sep 7, 2019

@calvinlfer i did some more checking and the issue of writers conflicting with each other when writing to same baseDir with dynamic partition overwrite does still exist in spark 2.4 and spark master for all file sources. i cannot say anything about writing to s3, that could be very different.
for more info please see (and vote for):
https://issues.apache.org/jira/browse/SPARK-28945

@tdas

This comment has been minimized.

Copy link
Contributor

@tdas tdas commented Dec 11, 2019

We have improved our concurrency control in this commit - f328300

This allows operations on disjoint partitions to be concurrently written.

@tdas tdas modified the milestones: Future Roadmap, 0.5.0 Dec 11, 2019
@tdas tdas closed this Dec 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants
You can’t perform that action at this time.