-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PECO-1751] Refactor CloudFetch downloader: handle files sequentially #405
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…; utilize Futures Signed-off-by: Levko Kravets <levko.ne@gmail.com>
Signed-off-by: Levko Kravets <levko.ne@gmail.com>
Signed-off-by: Levko Kravets <levko.ne@gmail.com>
kravets-levko
requested review from
rcypher-databricks,
yunbodeng-db,
andrefurlan-db,
jackyhu-db and
benc-db
as code owners
July 2, 2024 15:23
kravets-levko
changed the title
[PECO-1751] Refactor CloudFetch downloader: handle files sequentially (WIP)
[PECO-1751] Refactor CloudFetch downloader: handle files sequentially
Jul 2, 2024
jackyhu-db
reviewed
Jul 2, 2024
jackyhu-db
approved these changes
Jul 10, 2024
kravets-levko
added a commit
to databricks/databricks-sql-go
that referenced
this pull request
Aug 23, 2024
[PECO-1752] This PR is an attempt to fix a CloudFetch error "row number N is not contained in any arrow batch". See also databricks/databricks-sql-python#405 - basically, the same issue, the same root cause, similar solution. #### The problem In current implementation, all the links from a single TRowSet are added to a concurrent thread (goroutine) pool. The pool downloads them in a random order (all the tasks have the same priority and as a result - same chance to be executed first). To maintain the order of results, `startRowOffset`/`rowCount` fields from each CloudFetch link are used: library keeps track of the current row number, and use it to pick the right CloudFetch link (looking for the file where the current row is within [startRowOffset; startRowOffset + rowCount]). This solution has several caveats. First of all, library allows to fetch data only from beginning to the end. With a concurrent thread pool, you never know which file will be downloaded first. In the worst case, while the user is waiting for the very first file, the library may download all the other ones and keep them in memory because the user may need them in future. This increases the latency (on average it will be okay, but we have no control over it), and also memory consumption. Another problem with this approach is that if any of the files cannot be downloaded - there is no need to download the remaining files, the user won’t be able to process them anyway. But because files are downloaded in arbitrary order - nobody knows how many files will be downloaded before the user reaches the failed one. Also, seems that error handling wasn't done quite right, but that part of code was a bit unclear to me. Anyway, with this fix all the errors are properly handled and propagated to user when needed. #### The solution This PR changes CloudFetch downloader to use a queue. Downloader keeps a list of pending links (not scheduled), and current tasks. Number of tasks is limited, so new files are scheduled only when previous task is completed and extracted from queue. As user requests next files, downloader will pick the first task from the queue, and schedule the new one to run in background - to keep the queue full. Then, downloader will wait for the task it picked from the queue, and then return it to user. Tasks are still running in in parallel in background. Also, each task itself is reponsible for handling errors (e.g. retry failed downloads), so when task completes - it is either eventually successfull, or failed after all possible retries. With this approach, the proper order of files is automatically assured. All errors are either handled in downloader or propagated to user. If some file cannot be downloaded due to error - library will not download the remaining ones (like it did previously). Because new files are downloaded only when user consumes previous ones - library will not keep the whole dataset in memory. [PECO-1752]: https://databricks.atlassian.net/browse/PECO-1752?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
PECO-1751
This PR is an attempt to fix various CloudFetch-related issues (no data returned, duplicated rows returned, also maybe some performance and memory issues).
Replaces #362
The problem
In current implementation, all the links from a single TRowSet are added to a concurrent thrrad pool. The pool downloads them in a random order (all the tasks have the same priority and as a result - same chance to be executed first). To maintain the order of results,
startRowOffset
/rowCount
fields from each CloudFetch link are used: library keeps track of the current row number, and use it to pick the right CloudFetch link (looking for the file where the current row is within [startRowOffset; startRowOffset + rowCount]).This solution has several caveats. First of all, data can be fetched only from beginning to the end. With a concurrent thread pool, you never know which file will be downloaded first. In the worst case, while the user is waiting for the very first file, the library may download all the other ones and keep them in memory because the user may need them in future. This increases the latency (on average it will be okay, but we have no control over it), and also memory consumption.
Another problem with this approach is that if any of the files cannot be downloaded (even after retries) - there is no need to download the remaining files, the user won’t be able to process them anyway. But because files are downloaded in arbitrary order - nobody knows how many files will be downloaded before the user reaches the failed one.
Also, instead of utilizing the power of Futures, a custom synchronization and error handling is implemented - which may be a cause of synchronization issues. Additionally, retries are implemented outside of the download task, so when the task fails - outer code has to reschedule it, which is also a great source of sporadic errors.
The solution
This PR changes CloudFetch downloader to use a queue. Downloader keeps a list of pending links (not scheduled), and current tasks. Number of tasks is limited, so new files are scheduled only when previous task is completed and extracted from queue. As user requests next files, downloader will pick the first task from the queue, and schedule the new one to run in background - to keep the queue full. Then, downloader will wait for the task it picked from the queue, and then return it to user. Tasks are still running in in parallel in background. Also, each task itself is reponsible for handling errors (e.g. retry failed downloads), so when task completes - it is either eventually successfull, or failed after all possible retries.
With this approach, the proper order of files is automatically assured. All errors are either handled in downloader or propagated to user. If some file cannot be downloaded due to error - library will not download the remaining ones (like it did previously). Utilizing Futures solves all possible issues with threads synchronization. Because new files are downloaded only when user consumes previous ones - library will not keep the whole dataset in memory.