Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distribute S3 select over multiple paths and scan ranges #1445

Merged
merged 7 commits into from
Jul 15, 2022

Conversation

jaidisido
Copy link
Contributor

Feature or Bugfix

  • Feature

Detail

  • Distribute S3 select over multiple paths and scan ranges

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@jaidisido jaidisido changed the base branch from release-3.0.0 to main July 12, 2022 17:41
@jaidisido jaidisido changed the base branch from main to release-3.0.0 July 12, 2022 17:41
@malachi-constant
Copy link
Contributor

AWS CodeBuild CI Report

  • CodeBuild project: GithubDistributedCodeBuildA-mOAe7SRBXVyY
  • Commit ID: a237185
  • Result: FAILED
  • Build Logs (available for 30 days)

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

@malachi-constant
Copy link
Contributor

AWS CodeBuild CI Report

  • CodeBuild project: GithubDistributedCodeBuildA-mOAe7SRBXVyY
  • Commit ID: 013cac5
  • Result: FAILED
  • Build Logs (available for 30 days)

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

@malachi-constant
Copy link
Contributor

AWS CodeBuild CI Report

  • CodeBuild project: GithubDistributedCodeBuildA-mOAe7SRBXVyY
  • Commit ID: 5c3537b
  • Result: FAILED
  • Build Logs (available for 30 days)

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

@@ -532,6 +532,20 @@ def pyarrow_types_from_pandas( # pylint: disable=too-many-branches
return columns_types


def pyarrow2pandas_defaults(use_threads: Union[bool, int], kwargs: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

futures.append(self._exec.apply_async(func, arg))
return [f.get() for f in futures]
# Discard boto3.Session object & return futures
return list(func(*arg) for arg in zip(itertools.repeat(None), *args))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you're essentially using naive .remote() implementation instead of the pool just wrapping it in the same interface...

Considering the issues we've had with the Pool so far I think this might be the way to go, the only thing is that I wanted pool to handle the results & avoid having to call ray.get if config.distributed outside thus moving as much ray machinery as possible into this class for the benefit of SOC.

This would require a feeding all tasks into the pool at the same time similar to what was done here. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - as I said I am not a fan of having to call a wrapper to just do a ray.get and also to flatten a list, just couldn't see a way around it for this use case where we have two ray remote methods (path and scan range)

return _arrow_refs_to_df(arrow_refs=tables, kwargs=kwargs) # type: ignore


def list_to_arrow_table(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to keep this version of the function with the schema handling? this is just doing the minimum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it does a bit more than what we need it to do right now. Happy to limit it, just couldn't see the harm in expanding it a bit either

awswrangler/_utils.py Outdated Show resolved Hide resolved
awswrangler/s3/_select.py Outdated Show resolved Hide resolved
return _arrow_refs_to_df(arrow_refs=tables, kwargs=kwargs) # type: ignore


def list_to_arrow_table(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it does a bit more than what we need it to do right now. Happy to limit it, just couldn't see the harm in expanding it a bit either

futures.append(self._exec.apply_async(func, arg))
return [f.get() for f in futures]
# Discard boto3.Session object & return futures
return list(func(*arg) for arg in zip(itertools.repeat(None), *args))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - as I said I am not a fan of having to call a wrapper to just do a ray.get and also to flatten a list, just couldn't see a way around it for this use case where we have two ray remote methods (path and scan range)

@jaidisido jaidisido merged commit c6cf0e6 into release-3.0.0 Jul 15, 2022
@jaidisido jaidisido deleted the feat-3.0/distributed-s3-select branch July 18, 2022 08:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants