-
Notifications
You must be signed in to change notification settings - Fork 670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(perf): Distribute timestream write with executor #1715
Conversation
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
) | ||
return [item for sublist in res for item in sublist] | ||
) | ||
return _flatten_list(ray_get(errors)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two _flatten_list(ray_get())
where required here because of the imbricated ray remote methods (_write_df
and _write_batch
). This is not needed in S3 select for instance because we feed the ray reference ids from the first ray_get to a Ray dataset
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
@@ -43,6 +44,11 @@ def _to_modin( | |||
) | |||
|
|||
|
|||
def _split_modin_frame(df: modin_pd.DataFrame, splits: int) -> List[ObjectRef[Any]]: # pylint: disable=unused-argument |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% convinced that this is the best way to split a modin dataframe
version: int, | ||
boto3_session: Optional[boto3.Session] = None, | ||
) -> List[Dict[str, str]]: | ||
batches: List[List[Any]] = _utils.chunkify(lst=_df2list(df=df), max_length=100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the split modin dataframe block reference id is received. I assume modin/ray is smart enough to avoid a shuffle (i.e. pulling a block from one worker to another) and would instead run the remote functions (_write_df
and _write_batch
) in the worker where the block already exists...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The blocks would be broken down into batches and sent to workers so unfortunately some shuffle or rather copy will inevitably happen. One thing I'm afraid of is max_length=100
- these would be too fine-grained tasks, might not be worth it because of the overhead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, the load test on 64,000 rows was fine but let me check with an even larger one tomorrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me overall 👍
Consider increasing batch size to avoid too fine-grained tasks as per the comment above
6586dbd
to
21b56c5
Compare
Feature or Bugfix
Detail
In non-distributed case, DataFrame is split into smaller sub-dfs based on # of threads.
In distributed case, sub-dfs are obtained from ray object reference ids which are then submitted to the two ray remote methods (
_write_df
and_write_batch
)By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.