Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle rate limiting during write contention #2451

Closed
adriangb opened this issue Apr 24, 2024 · 5 comments · Fixed by #2454
Closed

Handle rate limiting during write contention #2451

adriangb opened this issue Apr 24, 2024 · 5 comments · Fixed by #2454
Labels
bug Something isn't working

Comments

@adriangb
Copy link
Contributor

During concurrent writes there is going to be contention for the next commit .json file.
GCS has a hard limit on 1 mutation operation per second per key, so that gets hit immediately.
You get back a 429, which should just be interpreted as "someone else wrote that key already, advance to the next commit id and try again" but is instead being surfaced as an error:

Error: Generic DeltaTable error: Version mismatch
Error: Generic GCS error: Client error with status 429 Too Many Requests: <?xml version='1.0' encoding='UTF-8'?><Error><Code>SlowDown</Code><Message>The object exceeded the rate limit for object mutation operations (create, update, and delete). Please reduce your request rate. See https://cloud.google.com/storage/docs/gcs429.</Message><Details>The object bucket-00d0331/test/delta/_delta_log/00000000000000000021.json exceeded the rate limit for object mutation operations (create, update, and delete). Please reduce your request rate. See https://cloud.google.com/storage/docs/gcs429.</Details></Error>
Error: Generic DeltaTable error: Version mismatch
Error: Generic GCS error: Client error with status 429 Too Many Requests: <?xml version='1.0' encoding='UTF-8'?><Error><Code>SlowDown</Code><Message>The object exceeded the rate limit for object mutation operations (create, update, and delete). Please reduce your request rate. See https://cloud.google.com/storage/docs/gcs429.</Message><Details>The object bucket-00d0331/test/delta/_delta_log/00000000000000000019.json exceeded the rate limit for object mutation operations (create, update, and delete). Please reduce your request rate. See https://cloud.google.com/storage/docs/gcs429.</Details></Error>
Error: Generic GCS error: Client error with status 429 Too Many Requests: <?xml version='1.0' encoding='UTF-8'?><Error><Code>SlowDown</Code><Message>The object exceeded the rate limit for object mutation operations (create, update, and delete). Please reduce your request rate. See https://cloud.google.com/storage/docs/gcs429.</Message><Details>The object bucket-00d0331/test/delta/_delta_log/00000000000000000019.json exceeded the rate limit for object mutation operations (create, update, and delete). Please reduce your request rate. See https://cloud.google.com/storage/docs/gcs429.</Details></Error>

Test code:

import os

os.environ['RUST_LOG'] = 'INFO'

from time import time
from concurrent.futures import ProcessPoolExecutor

import polars as pl
from deltalake import DeltaTable, write_deltalake
from google.cloud import storage



def worker(uri: str, df: pl.DataFrame) -> None:
    table = DeltaTable(uri, log_buffer_size=1024 * 1024)
    table.update_incremental()

    print('writing in a loop')
    while True:
        try:
            start = time()
            write_deltalake(table, df.to_arrow(), engine='rust', mode='append')
            print('Time taken:', time() - start)
        except Exception as e:
            print('Error:', e)
            continue


if __name__ == '__main__':
    print('Deleting all files in the bucket')
    client = storage.Client()
    bucket = client.get_bucket('bucket-00d0331')
    for blob in bucket.list_blobs(prefix='test/delta'):
        blob.delete()
    
    df = pl.DataFrame({
        'a': [1, 2, 3],
        'b': [4, 5, 6],
    })


    uri = 'gs://bucket-00d0331/test/delta'

    print('initializing delta table')
    write_deltalake(uri, df.to_arrow(), engine='rust', mode='append')

    with ProcessPoolExecutor(32) as executor:
        futures = [
            executor.submit(worker, uri, df)
            for _ in range(32)
        ]
        for future in futures:
            future.result()
@adriangb adriangb added the bug Something isn't working label Apr 24, 2024
@adriangb
Copy link
Contributor Author

cc @ion-elgreco since you’re on a roll

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Apr 25, 2024

@adriangb this one is a little bit more tricky, since we need to customize the deltalake-gcp crate: https://github.com/delta-io/delta-rs/blob/main/crates/gcp/src/lib.rs.

Also this might already fixed when we use a Conditional Put: #2296

@adriangb
Copy link
Contributor Author

I don't think this will be fixed by #2296. As per GCP Docs:

Maximum rate of writes to the same object name One write per second Writing to the same object name at a rate above the limit might result in throttling errors. For more information, see Object immutability.

So if you try to do a PUT (of any kind) to the same key more than 1 time per second you get a 429 error.

If there are two writers that try to commit the same version within 1s of each other this will happen, I don't think it matters if that was done via a PUT or a copy.

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Apr 25, 2024

@adriangb I see, yeah then there should be a fast path in the deltalake-gcp crate that handles that 429 error.

I see you know how to write Rust, so maybe could take a look at that : P (I don't have gcp myself)

@adriangb
Copy link
Contributor Author

I'm not sure what you mean by "fast path".

I gave it a quick shot. I'm just not sure how we're supposed to deal with that error.

I'm also happy to give you access to a test bucket if you'd like :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants